In [22]:
# Import modules
import pandas as pd
import logging

In [23]:
# define paths to data files
crash_data_file = "traffic_crashes.csv"
vehicle_data_file = "traffic_crash_vehicle.csv"

In [24]:
# import data as dataframes
df_crashes = pd.read_csv(f"{crash_data_file}")
df_vehicles = pd.read_csv(f"{vehicle_data_file}")

## Define the Pipeline Functions to run the Cleansing and Transformation Functions

In [25]:
def get_transformed_data(crash_file, vehicle_file):
    # import data
    df_crashes = pd.read_csv(f"{crash_data_file}") 
    df_vehicles= pd.read_csv(f"{vehicle_data_file}") 
    
    # remove specified missing values
    under_threshold_removed = df_crashes.dropna(axis='index', thresh=2, inplace=False) 
    under_threshold_rows = df_crashes[~df_crashes.index.isin(under_threshold_removed.index)] 
    df_crashes.fillna(value={'report_type': 'ON SCENE'}, inplace=True) 
    
    # merge crashes and vehicles
    df = df_crashes.merge(df_vehicles, how='left', on='crash_record_id', suffixes=('_left','_right')) 
    df_agg = df.groupby('vehicle_type').agg({'crash_record_id': 'count'}).reset_index() 
    
    # transform column names for output data
    vehicle_mapping = {'vehicle_type':'vehicletypes'}  
    df_agg = df_agg.rename(columns=vehicle_mapping)

    return df_agg

In [26]:
get_transformed_data(crash_data_file,vehicle_data_file) 

Unnamed: 0,vehicletypes,crash_record_id
0,BUS OVER 15 PASS.,5
1,MOPED OR MOTORIZED BICYCLE,1
2,OTHER,20
3,OTHER VEHICLE WITH TRAILER,1
4,PASSENGER,633
5,PICKUP,33
6,SINGLE UNIT TRUCK WITH TRAILER,2
7,SPORT UTILITY VEHICLE (SUV),138
8,TRACTOR W/ SEMI-TRAILER,5
9,TRACTOR W/O SEMI-TRAILER,2


### The preceding code can be split into reusable functions that are easy to manage

In [27]:
# Read data from data source 
def read_datasources(source_name):
    df = pd.read_csv(f"{source_name}")
    return df

In [28]:
# Drop rows with null values 
def drop_rows_with_null_values(df):
    under_threshold_removed = df.dropna(axis='index', thresh=2, inpalce=False)
    df = df[~df.index.isin(under_threshold_removed.index)]
    return df

In [29]:
# Fill missing values
def fill_missing_values(df):
    df = df.fillna(value={'report_type': 'ON SCENE'})
    return df

In [30]:
# Merge dataframes
def merge_dataframes(df_vehicles, df_crashes):
    df = df_crashes.merge(df_vehicles, how='left', on='crash_record_id', suffixes=('_left', '_right'))
    return df

In [55]:
# Rename columns
def rename_columns(df):
    vehicle_mapping = {'vehicle_type': 'vehicletypes'}
    df = df.rename(columns=vehicle_mapping)
    return df

## Define the Pipeline Functions to run the Cleansing and Transformation Functions

In [38]:
def read_data_pipeline(crash_file, vehicle_file):
    df_crash = pd.DataFrame()
    df_vehicle_crash = pd.DataFrame()
    try:
        df_crash = read_datasources(crash_data_file)
        df_vehicle = read_datasources(vehicle_data_file)
    except Exception as e:
        logging.info("Exception in reading data pipeline")
    finally:
        return df_crash, df_vehicle

In [39]:
def drop_rows_with_null_values_pipeline(df_crash, df_vehicle):
    try:
        df_crash = drop_rows_with_null_values(df_crash)
        df_vehicle_crash = drop_rows_with_null_values(df_vehicle)
    except Exception as e:
        logging,info("Exception in dropping rows with null value data pipeline")
    finally:
        return df_crash, df_vehicle

In [40]:
def fill_missing_values_pipeline(df_crash, df_vehicle): 
    try: 
        df_crash = fill_missing_values(df_crash) 
        df_vehicle_crash = fill_missing_values(df_vehicle) 
    except Exception as e: 
        logging.info("Exception in filling missing value pipeline") 
 
    finally: 
        return df_crash, df_vehicle

In [41]:
def merge_dataframes_pipeline(df_crash, df_vehicle): 
    try: 
        df_agg = merge_dataframes(df_vehicles,df_crashes)
    except Exception as e: 
        logging.info("Exception in merge dataframes pipeline") 
 
    finally: 
        return df_agg

In [42]:
def format_dataframes_pipeline(df_agg): 
    try: 
        df_output = rename_columns(df_agg)
    except Exception as e: 
        logging.info("Exception in renaming dataframe columns pipeline") 
 
    finally: 
        return df_output

In [57]:
# Define input data 
crash_data_file = "traffic_crashes.csv" 
vehicle_crash_data_file = "traffic_crash_vehicle.csv" 

# Read Data Pipeline
df_crash, df_vehicle = read_data_pipeline("traffic_crashes.csv", "traffic_crash_vehicle.csv")

# Drop Nulls
df_crash, df_vehicle = drop_rows_with_null_values_pipeline(df_crash, df_vehicle) 

# Fill in Missing Values
df_crash, df_vehicle = fill_missing_values_pipeline(df_crash, df_vehicle) 

# Merge Dataframes
df_agg = merge_dataframes_pipeline(df_crash, df_vehicle)

# Merge Dataframes
df_output = format_dataframes_pipeline(df_agg)

In [58]:
df_output.head()

Unnamed: 0,crash_record_id,rd_no_left,crash_date_est_i,crash_date_left,posted_speed_limit,traffic_control_device,device_condition,weather_condition,lighting_condition,first_crash_type,...,trailer1_length,trailer2_length,total_vehicle_length,axle_cnt,vehicle_config,cargo_body_type,load_type,hazmat_out_of_service_i,mcs_out_of_service_i,hazmat_class
0,530411c8611eb0ccb9b25f16b2955cd21761fa1928dcaa...,JE494048,,2021-12-31T14:00:00.000,35,NO CONTROLS,NO CONTROLS,CLEAR,DAYLIGHT,TURNING,...,,,,,,,,,,
1,530411c8611eb0ccb9b25f16b2955cd21761fa1928dcaa...,JE494048,,2021-12-31T14:00:00.000,35,NO CONTROLS,NO CONTROLS,CLEAR,DAYLIGHT,TURNING,...,,,,,,,,,,
2,305b06235b250aa0029c07313c84f969f4bc13c1cc3715...,JE494008,,2021-12-31T14:00:00.000,30,TRAFFIC SIGNAL,UNKNOWN,CLEAR,DUSK,TURNING,...,,,,,,,,,,
3,305b06235b250aa0029c07313c84f969f4bc13c1cc3715...,JE494008,,2021-12-31T14:00:00.000,30,TRAFFIC SIGNAL,UNKNOWN,CLEAR,DUSK,TURNING,...,,,,,,,,,,
4,305b06235b250aa0029c07313c84f969f4bc13c1cc3715...,JE494008,,2021-12-31T14:00:00.000,30,TRAFFIC SIGNAL,UNKNOWN,CLEAR,DUSK,TURNING,...,,,,,,,,,,


## Transformation Activities in Python

In [60]:
READING_CRASH_DATA_PIPELINE = "<NOT_EXECUTED>" 
DROPPING_ROW_WITH_NULL_PIPELINE = "<NOT_EXECUTED>" 
FILLING_MISSING_VALUE_PIPELINE = "<NOT_EXECUTED>" 
MERGE_DATAFRAME_PIPELINE = "<NOT_EXECUTED>" 

In [61]:
df_crash, df_vehicle = read_data_pipeline("traffic_crashes.csv", "traffic_crash_vehicle.csv")

if READING_CRASH_DATA_PIPELINE == "<OK>":
    df_crash, df_vehicle = drop_rows_with_null_values_pipeline(df_crash, df_vehicle)

elif DROPPING_ROW_WITH_NULL_PIPELINE == "<OK>":
    df_crash, df_vehicle= fill_missing_values_pipeline(df_crash, df_vehicle)

elif FILLING_MISSING_VALUE_PIPELINE == "<OK>":
    df_crash, df_vehicle = merge_dataframes_pipeline(df_crash, df_vehicle_crash) 