In [10]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

import pathlib

from tqdm import tqdm

import re

import usaddress

from difflib import get_close_matches as clmatch

import geopandas as gpd

from shapely import Point

import math

import logging

path = pathlib.Path().resolve()

data_path = path.parent / "Dropbox" / "2019 MV Data by Town" / "Vehicles_2022" / "Compiled"

raw_data = pd.read_csv(data_path / "2019-21_data_compiled_RN_100323.csv", chunksize = 1000)

pd.options.mode.chained_assignment = None  # default='warn'

In [11]:
logging.basicConfig(filename = 'municipal_data_prep.log', level = logging.INFO,  format='%(levelname)s: %(message)s')

# Full processing

## Prepare data and import matching sets

In [12]:
# Load NHTSA data
nhtsa_cleaned = pd.read_csv(path / "ignored-data" / "NHTSA_cleaned.csv")

# Simplify the cleaned file
nhtsa_cleaned_simple = nhtsa_cleaned[["VIN", "Manufacturer", "Model", "ModelYear", "FuelTypePrimary", "ElectrificationLevel"]]
nhtsa_cleaned_simple = nhtsa_cleaned_simple.rename(columns = {"VIN":"vin_corrected",
                                                              "Manufacturer" : "Manufacturer Name",
                                                              "ModelYear" : "Model Year",
                                                              "FuelTypePrimary" : "Fuel Type - Primary",
                                                              "ElectrificationLevel" : "Electrification Level"})

  nhtsa_cleaned = pd.read_csv(path / "ignored-data" / "NHTSA_cleaned.csv")


## Required functions

In [None]:
class ChunkProcessor():
    def __init__(self, chunks, processed_chunk_path, number_to_run, zip_processor, vin_matcher, zip_column_name, vin_column_name):
        self.chunks = chunks
        self.processed_chunks = pd.DataFrame([])
        self.processed_chunks_path = processed_chunk_path
        self.number_to_run = number_to_run
        self.chunk_number = 0
        self.zip_processor = zip_processor
        self.vin_matcher = vin_matcher
        self.zip_column_name = zip_column_name
        self.vin_column_name = vin_column_name
        
    def run(self):
        for chunk in self.chunks:
            if self.chunk_number < self.number_to_run:
                chunk_processed = self.process_chunk(chunk)
            else:
                logging.info(f"Concluding on chunk number {self.chunk_number}")
    
    def process_chunk(self, chunk):
        try:
            # Give progress
            if self.chunk_number % 1 == 0:
                print(f"Currently processing chunk number {self.chunk_number}")
            
            # Log progress
            logging.info(f"Chunk Number {self.chunk_number}: commencing processing")
            logging.info(f"Chunk Number {self.chunk_number}: chunk length is {len(chunk)}")
        
            # Reduce reduce the number of columns
            chunk_simplified = chunk[['record_from', 'name', 'street', 'city', 
                                  'state', 'zip', 'vehicle_year', 'vehicle_make', 'vehicle_model',
                                  'vehicle_class', 'vehicle_id']].reset_index(drop = True)
        
            # Correct the zip codes
            corrected_zip_codes = self.zip_processor.get_valid_zips(chunk_simplified, self.zip_column_name).reset_index(drop = True)

            # Get VIN codes
            matched_vins = self.vin_matcher.match_vins(chunk, self.vin_column_name).reset_index(drop=True)

            # Merge
            chunk_processed = chunk_simplified.join(corrected_zip_codes).reset_index(drop=True)
            chunk_processed = chunk_processed.join(matched_vins)
            
            # Aggregate and save
            self.aggregate_save_chunk(chunk_processed)

            # Update chunk number
            self.chunk_number +=1
            
            return chunk_processed
        except Exception as e:
            logging.error(f"Error encountered on chunk {i}, this means the last chunk to be run was chunk {i-1}")
            logging.error(e)
            print(f"Error encountered on chunk {i}, this means the last chunk to be run was chunk {i-1}")
            print(e)
        
    def aggregate_save_chunk(self, processed_chunk):
        # Add it to the master DF
        self.processed_chunks = pd.concat([self.processed_chunks, processed_chunk])
        
        if self.chunk_number % 10 == 0:
            logging.info(f"Saving aggregated chunks numbers {self.chunk_number-100} - {self.chunk_number}")
            self.processed_chunks.to_csv(self.processed_chunks_path / f"chunk_number_{self.chunk_number}.csv")
            
            # Once saved, reset the DF
            self.processed_chunks = pd.DataFrame([])

In [14]:
class ZIPProcessor():
    def __init__(self):
        pass
    
    def check_valid_zip(self, zip_code):
        zip_str = str(zip_code)
        split_zip = re.split("-", zip_str)
        if len(split_zip) == 2:
            if self.check_valid_zip(split_zip[0]) & self.check_valid_zip(split_zip[1]):
                return 2
            elif self.check_valid_zip(split_zip[0]):
                return 3
            else:
                return 0
        elif len(split_zip) == 1:
            # MUST ADDRESS STARTING "Os"
            # starting_o = re.match('^O', split_zip[0])
            matched = re.match("^\s*[0-9]*[0-9]{4}\.?0?\s*$", split_zip[0])
            if matched:
                return 1
            else:
                return 0
        else:
            return 0
        
    def get_valid_zips(self, zip_df, zip_column_name):
        """
        Inputs: A DataFrame with a column called "zip"
        Returns: A DataFrame of the same length as the input, with three columns: zip, zip_valid_code, and zip_corrected
        """
        # Prepare the list to be used
        zip_list = zip_df[[zip_column_name]].rename(columns = {zip_column_name : "zip"}).reset_index(drop = True)

        # Get validity code
        zip_list.loc[:, "zip_valid_code"] = zip_list.loc[:, "zip"].apply(lambda x: self.check_valid_zip(x))

        # Get indices
        correct_zips_indices = zip_list[zip_list["zip_valid_code"]==1].index
        invalid_zips_indices = zip_list[zip_list["zip_valid_code"]==0].index
        two_part_zips_indices = zip_list[zip_list["zip_valid_code"]>1].index

        zip_list.loc[correct_zips_indices, "zip_corrected"] = zip_list.loc[correct_zips_indices, "zip"]
        zip_list.loc[invalid_zips_indices, "zip_corrected"] = np.NaN
        zip_list.loc[two_part_zips_indices, "zip_corrected"] = zip_list.loc[two_part_zips_indices, "zip"].astype(str).str[0:5]

        return zip_list[["zip_corrected"]]

In [15]:
class VINMatcher():
    def __init__(self, initial_matching_list):
        self.matching_list = initial_matching_list
        self.queries = 0
        
    def valid_vins(self, vins_to_correct_df, vin_column):
        """
        Input: A DataFrame containing a "vehicle_id" column of VINs to be corrected
        Output: A DataFrame with two columns: "vehicle_id" and "vin_corrected"
        """
        # Set up
        vin_list = vins_to_correct_df
        
        # Check all alphanumerics
        vin_list.loc[:, "vin_alnum_check"] = vin_list[vin_column].str.strip().str.isalnum()
        vin_list.loc[:, "vin_len_check"] = vin_list[vin_column].str.len() >= 11
        vin_list.loc[:, "vin_check"] = vin_list["vin_alnum_check"] & vin_list["vin_len_check"]

        # Strip the vins
        vin_list["vin_stripped"] = vin_list[vin_column].str.strip()

        # Create 11-long vins
        vin_list.loc[:, "prepared_vins"] = vin_list[vin_column].str[0:8]+"*"+vin_list[vin_column].str[9:11]
        vin_list.loc[vin_list[vin_list["vin_check"]==False].index, "vin_corrected"] = np.NaN
        vin_list.loc[vin_list[vin_list["vin_check"]==True].index, "vin_corrected"] = vin_list.loc[vin_list[vin_list["vin_check"]==True].index, "prepared_vins"]

        # Clean up
        vin_list = vin_list.drop("prepared_vins", axis = 1)
        vin_list = vin_list[["vin_corrected"]]

        # Return
        return vin_list
        
    def match_vins(self, df, vin_column):
        """
        Input: A df containing vin columns, that are then corrected, and matched
        Returns: matched vins, updated matching list    
        """
        # Get a list of valid VINs
        valid_vin_list = self.valid_vins(df[[vin_column]], vin_column)
        
        # Attempt a match
        match = valid_vin_list.merge(self.matching_list,
                                     left_on = "vin_corrected",
                                     right_on = "vin_corrected",
                                     how = 'left')
        # Get unique unmatched vins
        unmatched_vins = list(match[match["Manufacturer Name"].isna()]["vin_corrected"].unique())

        # Print how many
        logging.info(f"VIN matching: a total of {len(unmatched_vins)} VINs were not matched")

        # Variables to download
        variables = ["Manufacturer Name", "Model", "Model Year", "Fuel Type - Primary", "Electrification Level"]
        
        # Go get them
        for vin in tqdm(unmatched_vins):
            try:
                # Try to fetch the unmatched vin
                resp_df = self.fetch_unmatched_vin(vin).reset_index(drop=True)
                for variable in variables:
                    match.loc[match[match["vin_corrected"]==vin].index, match.columns.isin([variable])] = resp_df[variable][0]
                
            except BaseException as e:
                # print(e)
                logging.info(e)
                pass

        remaining_unmatched = list(match[match["Manufacturer Name"].isna()]["vin_corrected"].unique())
        
        logging.info(f"VIN Matching: this number of unmatched VINs was reduced by {len(unmatched_vins) - len(remaining_unmatched)}")
        logging.info(f"VIN Matching: remaining unmatched VINs is {len(remaining_unmatched)}")

        return match
    
    def fetch_unmatched_vin(self, unmatched_vin):
        """
        Input: An unmatched, but corrected VIN
        Output: A matched VIN or NA
        
        """
        # Increment the number of times queried
        self.queries +=1
        
        variables = ["Manufacturer Name", "Model", "Model Year", "Fuel Type - Primary", "Electrification Level"]
        
        url = (f"https://vpic.nhtsa.dot.gov/api/vehicles/DecodeVin/{unmatched_vin.strip()}?format=csv")

        # Download response
        resp_df = pd.read_csv(url)

        # Extract needed
        resp_df = resp_df.loc[resp_df["variable"].isin(variables), ["variable", "value"]].T
        resp_df.columns = resp_df.iloc[0]
        resp_df = resp_df.drop("variable", axis = 0)
        resp_df["vin_corrected"] = unmatched_vin
        valid_response = not(resp_df["Fuel Type - Primary"].isna()[0])
        
        # Update the matching list
        self.matching_list = pd.concat([self.matching_list, resp_df]).reset_index(drop = True)
        
        # If the number of queries is an increment of 100, save the matching list        
        if self.queries % 10 == 0:
            logging.info(f"Saving matching list after {self.queries} queries")
            self.matching_list.to_csv(path / "matching_list.csv")
        
        return resp_df

# Test Function

In [16]:
processed_chunks_path = path/"processed_chunks"
number_to_run = 30

In [17]:
chunks = pd.read_csv(data_path / "2019-21_data_compiled_RN_100323.csv", chunksize = 10)

In [18]:
zp = ZIPProcessor()
vm = VINMatcher(nhtsa_cleaned_simple)
cp = ChunkProcessor(chunks, processed_chunks_path, number_to_run, zp, vm, "zip", "vehicle_id")

In [19]:
cp.run()

Currently processing chunk number 0


100%|██████████| 4/4 [00:01<00:00,  2.40it/s]


Currently processing chunk number 1


100%|██████████| 6/6 [00:02<00:00,  2.97it/s]


Currently processing chunk number 2


100%|██████████| 6/6 [00:02<00:00,  2.70it/s]


Currently processing chunk number 3


100%|██████████| 7/7 [00:02<00:00,  2.36it/s]


Currently processing chunk number 4


100%|██████████| 4/4 [00:01<00:00,  3.31it/s]


Currently processing chunk number 5


100%|██████████| 6/6 [00:02<00:00,  2.37it/s]


Currently processing chunk number 6


100%|██████████| 3/3 [00:01<00:00,  2.99it/s]


Currently processing chunk number 7


100%|██████████| 7/7 [00:02<00:00,  2.81it/s]


Currently processing chunk number 8


100%|██████████| 10/10 [00:03<00:00,  2.56it/s]


Currently processing chunk number 9


100%|██████████| 9/9 [00:03<00:00,  2.32it/s]


Currently processing chunk number 10


100%|██████████| 3/3 [00:00<00:00,  3.26it/s]


Currently processing chunk number 11


100%|██████████| 7/7 [00:02<00:00,  2.56it/s]


Currently processing chunk number 12


100%|██████████| 8/8 [00:03<00:00,  2.09it/s]


Currently processing chunk number 13


100%|██████████| 4/4 [00:01<00:00,  2.66it/s]


Currently processing chunk number 14


100%|██████████| 8/8 [00:02<00:00,  2.94it/s]


Currently processing chunk number 15


100%|██████████| 6/6 [00:02<00:00,  2.75it/s]


Currently processing chunk number 16


100%|██████████| 7/7 [00:03<00:00,  2.31it/s]


Currently processing chunk number 17


100%|██████████| 3/3 [00:00<00:00,  3.47it/s]


Currently processing chunk number 18


100%|██████████| 5/5 [00:01<00:00,  2.79it/s]


Currently processing chunk number 19


100%|██████████| 10/10 [00:04<00:00,  2.45it/s]


Currently processing chunk number 20


100%|██████████| 7/7 [00:02<00:00,  2.65it/s]


Currently processing chunk number 21


100%|██████████| 7/7 [00:02<00:00,  3.40it/s]


Currently processing chunk number 22


100%|██████████| 7/7 [00:02<00:00,  2.82it/s]


Currently processing chunk number 23


100%|██████████| 6/6 [00:02<00:00,  2.64it/s]


Currently processing chunk number 24


100%|██████████| 9/9 [00:02<00:00,  3.09it/s]


Currently processing chunk number 25


100%|██████████| 2/2 [00:00<00:00,  2.20it/s]


Currently processing chunk number 26


100%|██████████| 9/9 [00:02<00:00,  3.25it/s]


Currently processing chunk number 27


100%|██████████| 5/5 [00:01<00:00,  3.02it/s]


Currently processing chunk number 28


100%|██████████| 8/8 [00:02<00:00,  3.01it/s]


Currently processing chunk number 29


100%|██████████| 6/6 [00:02<00:00,  2.80it/s]


KeyboardInterrupt: 