In [1]:
## ISyE6740 - Computational Data Analytics
## Course Project
## Data Preparation - Stage 1
##
## -  This script reads datasets from CSV files, then saves them to
##    parquet files for speed.
## -  It is also responsible for creating a data sample for visualization
##    setup, because Tableau is so slow with original data.
## -  Metadata for all datasets is loaded and some statistics to aid in
##    later tasks are generated.
## -  Data reduction is performed to reduce number of features available
##    for analysis stage. At this stage, mostly useless features are removed.

In [2]:
## imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
import os
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

In [3]:
## PART 1
## Convert CSV to Parquet files, if Parquet files do not exist
data_files = pd.DataFrame.from_dict({
    'tableName': ['CRASHES',
                  'VEHICLES',
                  'PEOPLE'],
    'csv_file': [r'.\data\csv\chicago-crashes-20240416.csv',
                 r'.\data\csv\chicago-vehicles-20240416.csv',
                 r'.\data\csv\chicago-people-20240416.csv'],
    'metadata_file': [r'.\data\metadata\chicago-crashes-20240416.json',
                      r'.\data\metadata\chicago-vehicles-20240416.json',
                      r'.\data\metadata\chicago-people-20240416.json'],
    'csv_sample_file': [r'.\data\sample\chicago-crashes-sample.csv',
                        r'.\data\sample\chicago-vehicles-sample.csv',
                        r'.\data\sample\chicago-people-sample.csv'],
    'parquet_file': [r'.\data\parquet\chicago-crashes-20240416.parquet.gzip',
                     r'.\data\parquet\chicago-vehicles-20240416.parquet.gzip',
                     r'.\data\parquet\chicago-people-20240416.parquet.gzip'],
    'reduced_st1': [r'.\data\reduced\stage1\chicago-crashes-reduced-st1.parquet.gzip',
                    r'.\data\reduced\stage1\chicago-vehicles-reduced-st1.parquet.gzip',
                    r'.\data\reduced\stage1\chicago-people-reduced-st1.parquet.gzip']
})

for _, r in data_files.iterrows():
    if not os.path.isfile(r['parquet_file']):
        print(f"NOT FOUND: {r['parquet_file']}.\nCREATING: {r['parquet_file']}...", end="")
        df = pd.read_csv(r['csv_file'], low_memory=False)
        df.to_parquet(r['parquet_file'], index=False, compression='gzip', engine='auto')
        print("DONE!\n")
    else:
        print(f"FOUND: {r['parquet_file']}.\nMoving on...\n")

print("Conversion complete!")

FOUND: .\data\parquet\chicago-crashes-20240416.parquet.gzip.
Moving on...

FOUND: .\data\parquet\chicago-vehicles-20240416.parquet.gzip.
Moving on...

FOUND: .\data\parquet\chicago-people-20240416.parquet.gzip.
Moving on...

Conversion complete!


In [4]:
## PART 2
## Load Metadata
metadata_df = pd.DataFrame(columns=['tableName', 'name', 'dataTypeName', 'description'])

for _, r in data_files.iterrows():
    with open(r['metadata_file'], errors='ignore') as f:
        metadata_dict = json.load(f)
        df = pd.DataFrame(metadata_dict['columns'])[['name', 'dataTypeName', 'description']]
        df.insert(loc=0, column='tableName', value=r['tableName'])
        
        metadata_df = pd.concat([metadata_df, df])

metadata_df = metadata_df.reset_index(drop=False, names='column_index') \
                [['tableName', 'column_index', 'name', 'dataTypeName', 'description']]

metadata_df.insert(loc=4, column='count', value=0)
metadata_df['count'] = metadata_df['count'].astype(int)

metadata_df.insert(loc=5, column='nulls', value=0)
metadata_df['count'] = metadata_df['nulls'].astype(int)

metadata_df.insert(loc=6, column='unique', value=0)
metadata_df['count'] = metadata_df['unique'].astype(int)

print('Metadata loaded successfully...')
# metadata_df

Metadata loaded successfully...


In [5]:
## PART 3
## Load Data from Parquet Files
## Update data statistics
data_frames={}
for _, r in data_files.iterrows():
    print(f"PROCESSING: {r['tableName']}...")
    df = pd.read_parquet(r['parquet_file'], engine='auto').fillna(value=np.nan)

    # count
    metadata_df.loc[metadata_df['tableName']==r['tableName'], 'count']=df.shape[0]

    # nulls
    df_nulls = pd.DataFrame(df.isnull().sum(), columns=['nulls']).reset_index(drop=False, names='name')
    df_nulls['tableName'] = r['tableName']
    metadata_df = pd.merge(left=metadata_df, right=df_nulls, how='left', suffixes=['', '_'], on=('tableName', 'name')).ffill()
    metadata_df.loc[metadata_df['tableName']==r['tableName'], 'nulls'] = metadata_df.loc[metadata_df['tableName']==r['tableName'], 'nulls_'].fillna(0).astype(int)
    # metadata_df.update(df_nulls)

    # unique elements
    df_unique =pd.DataFrame(df.nunique(), columns=['unique']).reset_index(drop=False, names='name')
    df_unique['tableName'] = r['tableName']
    metadata_df = pd.merge(left=metadata_df, right=df_unique, how='left', suffixes=['', '_'], on=('tableName', 'name')).ffill()
    metadata_df.loc[metadata_df['tableName']==r['tableName'], 'unique'] = metadata_df.loc[metadata_df['tableName']==r['tableName'], 'unique_'].fillna(0).astype(int)
    # metadata_df['unique'] = metadata_df['unique_'].fillna(0).astype(int)

    metadata_df = metadata_df.drop(['nulls_', 'unique_'], axis=1)
    # metadata_df.update(df_unique)    

    data_frames[r['tableName']] = df.copy()

# Add new statistics
metadata_df.insert(loc=7, column='percent_null', value=metadata_df['nulls']/metadata_df['count'])
metadata_df.insert(loc=7, column='percent_unique', value=metadata_df['unique']/metadata_df['count'])

metadata_df.to_excel(r'.\data\chicago-dataset-summary.xlsx', index=False)
print("\nData loaded successfully...")
# metadata_df

PROCESSING: CRASHES...
PROCESSING: VEHICLES...
PROCESSING: PEOPLE...

Data loaded successfully...


In [7]:
## PART 4
## Check all samples. If any of them does not exist, regenerate all
sample_size=0.1
sample_data_frames={}

files_found = 0
for _, r in data_files.iterrows():
    files_found += os.path.isfile(r['csv_sample_file'])

if files_found == len(data_files['csv_sample_file']):
    print("All samples found!")
else:
    print(f"Regenerating samples using {sample_size*100:0.0f}% of data...\n")
    
    # Identify common ID's in the sample
    df = data_frames['CRASHES']['CRASH_RECORD_ID'].sample(frac=sample_size)
    df = pd.merge(left=df, right=data_frames['VEHICLES'], on='CRASH_RECORD_ID', how='inner').drop_duplicates(subset=['CRASH_RECORD_ID'])['CRASH_RECORD_ID']
    df = pd.merge(left=df, right=data_frames['PEOPLE'], on='CRASH_RECORD_ID', how='inner').drop_duplicates(subset=['CRASH_RECORD_ID'])['CRASH_RECORD_ID']
    
    # sample!
    for _, r in data_files.iterrows():
        print(f"Writing: {r['csv_sample_file']}")
        sample_data_frames[r['tableName']] = pd.merge(left=df, right=data_frames[r['tableName']], on='CRASH_RECORD_ID', how='inner')
        sample_data_frames[r['tableName']].to_csv(r['csv_sample_file'])

All samples found!


In [8]:
data_frames['CRASHES'].shape

(823577, 49)

In [60]:
## PART 5
## Data Reduction
## 1. Drop columns with ~100% nulls
null_cols1 = {'CRASHES': ['WORKERS_PRESENT_I',         'DOORING_I',            'WORK_ZONE_TYPE',
                          'WORK_ZONE_I',               'PHOTOS_TAKEN_I',       'STATEMENTS_TAKEN_I',
                          'NOT_RIGHT_OF_WAY_I',        'CRASH_DATE_EST_I'],
              
              'VEHICLES': ['HAZMAT_REPORT_NO',         'MCS_REPORT_NO',        'HAZMAT_NAME',
                           'TRAILER2_LENGTH',          'WIDE_LOAD_I',          'HAZMAT_PLACARDS_I',
                           'TRAILER2_WIDTH',           'UN_NO',                'IDOT_PERMIT_NO',
                           'HAZMAT_CLASS',             'FIRE_I',               'ILCC_NO',
                           'CCMC_NO',                  'TRAILER1_LENGTH',      'EXCEED_SPEED_LIMIT_I',
                           'TOTAL_VEHICLE_LENGTH',     'TRAILER1_WIDTH',       'AXLE_CNT',
                           'GVWR',                     'USDOT_NO',             'COMMERCIAL_SRC',
                           'HAZMAT_OUT_OF_SERVICE_I',  'MCS_OUT_OF_SERVICE_I', 'MCS_VIO_CAUSE_CRASH_I',
                           'HAZMAT_REPORT_I',          'MCS_REPORT_I',         'HAZMAT_VIO_CAUSE_CRASH_I',
                           'HAZMAT_PRESENT_I',         'LOAD_TYPE',            'CARGO_BODY_TYPE',
                           'VEHICLE_CONFIG',           'CARRIER_CITY',         'CARRIER_STATE',
                           'CARRIER_NAME',             'CMV_ID',               'CMRC_VEH_I',
                           'AREA_00_I',                'TOWED_TO',             'AREA_09_I',
                           'TOWED_BY',                 'AREA_04_I',            'AREA_03_I',
                           'AREA_10_I'],
              
              'PEOPLE': ['CELL_PHONE_USE',             'BAC_RESULT VALUE',     'EMS_RUN_NO',
                         'PEDPEDAL_VISIBILITY',        'PEDPEDAL_LOCATION',    'PEDPEDAL_ACTION']
}

for t in null_cols1:
    data_frames[t].drop(null_cols1[t], axis=1, inplace=True)

## 2. Drop columns with 80%~99% nulls
null_cols2 = {'CRASHES': ['INTERSECTION_RELATED_I',    'LANE_CNT'],
              
              'VEHICLES': ['AREA_99_I',                'AREA_08_I',            'AREA_07_I',
                           'AREA_05_I',                'AREA_06_I',            'AREA_02_I',
                           'AREA_12_I',                'AREA_11_I',            'AREA_01_I'],
              
              'PEOPLE': ['EMS_AGENCY']
}

for t in null_cols2:
    data_frames[t].drop(null_cols2[t], axis=1, inplace=True)

print("Writing stage 1 results...")
for _, r in data_files.iterrows():
    if not os.path.isfile(r['reduced_st1']):
        print(f"NOT FOUND: {r['reduced_st1']}.\nCREATING: {r['reduced_st1']}...", end="")
        data_frames[r['tableName']].to_parquet(r['reduced_st1'], index=False, compression='gzip', engine='auto')
        print("DONE!\n")
    else:
        print(f"FOUND: {r['reduced_st1']}.\nMoving on...\n")

print("Data reduction completed successfully...")

KeyError: "['WORKERS_PRESENT_I', 'DOORING_I', 'WORK_ZONE_TYPE', 'WORK_ZONE_I', 'PHOTOS_TAKEN_I', 'STATEMENTS_TAKEN_I', 'NOT_RIGHT_OF_WAY_I', 'CRASH_DATE_EST_I'] not found in axis"

In [68]:
print("Writing stage 1 results...")
for _, r in data_files.iterrows():
    if not os.path.isfile(r['reduced_st1']):
        print(f"NOT FOUND: {r['reduced_st1']}.\nCREATING: {r['reduced_st1']}...", end="")
        data_frames[r['tableName']].to_parquet(r['reduced_st1'], index=False, compression='gzip', engine='auto')
        print("DONE!\n")
    else:
        print(f"FOUND: {r['reduced_st1']}.\nMoving on...\n")

print("Data reduced successfully...")
# data_frames['VEHICLES'][data_frames['VEHICLES']['UNIT_TYPE']!='DRIVER'].head().T
# data_frames['VEHICLES'][data_frames['VEHICLES']['UNIT_TYPE']=='BICYCLE']
# data_frames['VEHICLES'][data_frames['VEHICLES']['UNIT_TYPE'].isnull()].head().T
cid = 'f59f115cb6eda57e929debe2c17026ca17bce6ad0d9842cdcec64f1779dada0c32925d2209fe9ac280cc57acafb6462896470052d27d63d2eae5c8e7e36fdb06'
data_frames['PEOPLE'][data_frames['PEOPLE']['ZIPCODE'].isnull() == True].head().T

Writing stage 1 results...
NOT FOUND: .\data\reduced\stage1\chicago-crashes-reduced-st1.parquet.gzip.
CREATING: .\data\reduced\stage1\chicago-crashes-reduced-st1.parquet.gzip...DONE!

NOT FOUND: .\data\reduced\stage1\chicago-vehicles-reduced-st1.parquet.gzip.
CREATING: .\data\reduced\stage1\chicago-vehicles-reduced-st1.parquet.gzip...DONE!

NOT FOUND: .\data\reduced\stage1\chicago-people-reduced-st1.parquet.gzip.
CREATING: .\data\reduced\stage1\chicago-people-reduced-st1.parquet.gzip...DONE!


Data reduced successfully...


Unnamed: 0,2,3,4,5,6
PERSON_ID,O10018,O10038,O10039,O10041,O10062
PERSON_TYPE,DRIVER,DRIVER,DRIVER,DRIVER,DRIVER
CRASH_RECORD_ID,71162af7bf22799b776547132ebf134b5b438dcf3dac6b...,c21c476e2ccc41af550b5d858d22aaac4ffc88745a1700...,eb390a4c8e114c69488f5fb8a097fe629f5a92fd528cf4...,dd1bce4bd6d0be4c247714dcabab44e6563c62b913229b...,4bd2ee6bb306902b99a9c2ae55cf4fcffec00879e39759...
VEHICLE_ID,9579.0,9598.0,9600.0,9601.0,9621.0
CRASH_DATE,11/01/2015 05:00:00 AM,11/01/2015 08:00:00 AM,11/01/2015 10:15:00 AM,11/01/2015 11:00:00 AM,11/01/2015 12:30:00 PM
SEAT_NO,,,,,
CITY,,,,,
STATE,,,,,
ZIPCODE,,,,,
SEX,X,X,X,X,X


In [59]:
## Part 3: EDA
data_frames['CRASHES'].describe(include='all').T

Unnamed: 0,count,unique,top,freq,mean,std,min,25%,50%,75%,max
CRASH_RECORD_ID,823577.0,823577.0,1e9318e69bbdc24db5137569e760a6a8feeb1ccce86e88...,1.0,,,,,,,
CRASH_DATE,823577.0,540963.0,12/29/2020 05:00:00 PM,30.0,,,,,,,
POSTED_SPEED_LIMIT,823577.0,,,,28.407103,6.161772,0.0,30.0,30.0,30.0,99.0
TRAFFIC_CONTROL_DEVICE,823577.0,19.0,NO CONTROLS,468070.0,,,,,,,
DEVICE_CONDITION,823577.0,8.0,NO CONTROLS,473497.0,,,,,,,
WEATHER_CONDITION,823577.0,12.0,CLEAR,645355.0,,,,,,,
LIGHTING_CONDITION,823577.0,6.0,DAYLIGHT,526779.0,,,,,,,
FIRST_CRASH_TYPE,823577.0,18.0,PARKED MOTOR VEHICLE,191357.0,,,,,,,
TRAFFICWAY_TYPE,823577.0,20.0,NOT DIVIDED,357372.0,,,,,,,
LANE_CNT,199010.0,,,,13.330109,2961.593711,0.0,2.0,2.0,4.0,1191625.0


In [24]:
data_frames['PEOPLE'][data_frames['PEOPLE']['AGE'].isnull()].head().T

Unnamed: 0,2,3,4,5,6
PERSON_ID,O10018,O10038,O10039,O10041,O10062
PERSON_TYPE,DRIVER,DRIVER,DRIVER,DRIVER,DRIVER
CRASH_RECORD_ID,71162af7bf22799b776547132ebf134b5b438dcf3dac6b...,c21c476e2ccc41af550b5d858d22aaac4ffc88745a1700...,eb390a4c8e114c69488f5fb8a097fe629f5a92fd528cf4...,dd1bce4bd6d0be4c247714dcabab44e6563c62b913229b...,4bd2ee6bb306902b99a9c2ae55cf4fcffec00879e39759...
VEHICLE_ID,9579.0,9598.0,9600.0,9601.0,9621.0
CRASH_DATE,11/01/2015 05:00:00 AM,11/01/2015 08:00:00 AM,11/01/2015 10:15:00 AM,11/01/2015 11:00:00 AM,11/01/2015 12:30:00 PM
SEAT_NO,,,,,
CITY,,,,,
STATE,,,,,
ZIPCODE,,,,,
SEX,X,X,X,X,X
