In [None]:
from bs4 import BeautifulSoup
import requests
import json
import re

import pandas as pd

import os

import re

# Get safety data from IIHS.org

In [None]:
def get_vehicle_info (soup):
    data_rows = []
    for row in soup.find_all('td', class_="Vehicle"):
        for td in row:
            iihs_url = url_base + td.get('href')
            names = [x.text for x in td.children]
            output = {}
            if ('Safety' in names[0]):
                make_model = names[1]
                style_years = ' '.join(names[2:])
                iihs_ranking = names[0]
            else:
                make_model = names[0]
                style_years = ' '.join(names[1:])
                iihs_ranking = ''
            (style, year) = style_years.split('|')
            style = style.strip()
            year_str = year.replace('models', '').strip()
            if len(year_str) == 4:
                year = int(year_str)
                other_notes = ""
            else:
                year = int(year_str[:4])
                other_notes = year_str[4:]
            make = make_model.split(' ')[0]
            model = ' '.join(make_model.split(' ')[1:])
            if (make=="Alfa"):
                make = "Alfa Romeo"
                model = model.replace("Romeo ", "")
            data_rows = data_rows + [{'make': make, 'model': model, 'style': style, 'year': year,
                                      "iihs_ranking": iihs_ranking, 'iihs_url': iihs_url,
                                      'other_notes': other_notes}]

    df = pd.DataFrame(data_rows)
    return (df)

In [None]:
def extract_table (url):
    page = requests.get(url)
    df_raw = pd.read_html(page.text)[0]
    soup = BeautifulSoup(page.text, 'html.parser')
    df_id = get_vehicle_info(soup).assign(iihs_type=os.path.basename(url))
    df = pd.concat([df_id, df_raw.drop(columns="Vehicle Name")], axis=1)
    return (df)

In [None]:
car_types = ["small-cars", "midsize-cars", "midsize-luxury-cars", "large-cars", "large-luxury-cars", 
             "small-suvs", "midsize-suvs", "midsize-luxury-suvs", "large-suvs", "minivans"]
url_base = "https://www.iihs.org"
urls = [url_base + "/ratings/class-summary/" + x for x in car_types]

In [None]:
print(urls[0])

In [None]:
car_table = pd.concat([extract_table(x) for x in urls]).reset_index(drop=True)

# Get fuel economy data from fueleconomy.gov

In [None]:
epadata_vehicles = pd.read_csv("https://www.fueleconomy.gov/feg/epadata/vehicles.csv.zip")

In [None]:
epadata_vehicles

In [None]:
def find_fuelecon_ids (make, model, year, db_full, recursive=True):
    if (make=="Volvo"):
        model = model.replace("Cross Country", "CC")
    db = db_full[db_full["year"]==year]
    criterion1 = db["make"].str.match(make, case=False)
    criterion2 = db["model"].str.match(model, case=False)
    id_list = db[criterion1 & criterion2]["id"].to_list()
    if (len(id_list)==0):
        if ((make=="BMW") & ('series' in model)):
            criterion2 = db["model"].str.startswith(model.split(' ')[0])
            model = model.replace(' '.join(model.split(' ')[0:2]), '')
            if (' ' in model):
                for x in model.split(' '):
                    criterion2 = criterion2 & db["model"].str.contains(x, case=False)
        elif (make=="Mercedes-Benz"):
            model = model.replace('-Class', '')
            criterion2 = db["model"].str.replace('\d+', '').str.split().str.get(0).str.match(model)
        elif (make=="Volvo"):
            if (' ' in model):
                criterion2 = True
                for x in model.split(' '):
                    criterion2 = criterion2 & db["model"].str.contains(x, case=False)
        elif (make=="Mini"):
            criterion2 = db["model"].str.contains(model, case=False)
        elif ("ybrid" in model):
            model_prefix = re.sub("Hybrid", "", model, flags=re.IGNORECASE)
            criterion2 = db["model"].str.contains("Hybrid", case=False) & db["model"].str.startswith(model_prefix)
        id_list = db[criterion1 & criterion2]["id"].to_list()
        if ((len(id_list)==0) and recursive):
            id_list = find_fuelecon_ids(make, model, year-1, db_full, recursive=False)
    return (id_list)

In [None]:
fueleconomy_ids = car_table[["make", "model", "year"]].\
apply(lambda x: find_fuelecon_ids(make=x[0], model=x[1], year=x[2], db_full=epadata_vehicles), axis=1)

In [None]:
car_table_fuel = car_table.assign(fuelecon_ids=fueleconomy_ids)

In [None]:
car_table_fuel

# Merge, group by and aggregate

In [None]:
merged_car_data = pd.merge(car_table_fuel.explode("fuelecon_ids"), epadata_vehicles, \
                           how="left", left_on="fuelecon_ids", right_on="id", suffixes=('', '_epa'))
merged_car_data = merged_car_data[merged_car_data["trany"].str.contains("Automatic", na=False)]
merged_car_data["fuelType"] = merged_car_data["atvType"].fillna("Gasoline")
merged_car_data = merged_car_data.drop(columns=["atvType", "id"])

In [None]:
groupby_cols = list(car_table_fuel.columns) + ["fuelType"]
groupby_cols.remove("fuelecon_ids")

In [None]:
merged_car_data_numerical_cols =\
    list(merged_car_data.columns[merged_car_data.dtypes.isin([dtype('int64'), dtype('float64')])])
merged_car_data_numerical_cols = [x for x in merged_car_data_numerical_cols if x not in groupby_cols+["fuelecon_ids"]]
numeric_agg_funcs = dict(zip(merged_car_data_numerical_cols, [["min", "max", "mean", "median"]]*len(merged_car_data_numerical_cols)))

In [None]:
merged_car_data_nonnumeric_cols = list(merged_car_data.columns[~merged_car_data.dtypes.isin([dtype('int64'), dtype('float64')])])
merged_car_data_nonnumeric_cols = [x for x in merged_car_data_nonnumeric_cols if x not in groupby_cols+["fuelecon_ids"]]
nonnumeric_agg_funcs = dict(zip(merged_car_data_nonnumeric_cols, [pd.Series.mode]*len(merged_car_data_nonnumeric_cols)))

In [None]:
agg_funcs = {**numeric_agg_funcs, **nonnumeric_agg_funcs}
agg_funcs.update({"fuelecon_ids":list})

In [None]:
merged_car_data_stats = merged_car_data.groupby(groupby_cols, dropna=False).agg(agg_funcs)
merged_car_data_stats.columns = merged_car_data_stats.columns.to_series().apply('_'.join)
merged_car_data_stats.columns = [x.rstrip("_").replace("_mode", "").replace("_list", "") for x in list(merged_car_data_stats.columns)]

In [None]:
merged_car_data_stats.reset_index(inplace=True)
merged_car_data_stats.rename(columns={"fuelecon_ids":"id"}, inplace=True)

In [None]:
merged_car_data_stats

# Save to file

In [None]:
outdir = "./"

In [None]:
merged_car_data_stats.to_csv(os.path.join(outdir, "2022_car_rankings_raw_DO_NOT_EDIT.csv"))

In [None]:
merged_car_data_stats.to_excel(os.path.join(outdir, "2022_car_rankings.xlsx"), sheet_name="iihs car ranking")