## Data Cleansing and Transformation in Python

In [2]:
# Import modules
import pandas as pd 
import logging
import numpy as np

In [3]:
# create a dataframe of integers (4 rows, 5 columns) with some missing values
df = pd.DataFrame([[1, 2, np.nan, 4, 5], [np.nan, 2, 6, 3, 7], [2, 2, 3, 5, 5], [5, 1, 3, np.nan, 5]], columns=['A', 'B', 'C', 'D', 'E'])
print(df.head())

     A  B    C    D  E
0  1.0  2  NaN  4.0  5
1  NaN  2  6.0  3.0  7
2  2.0  2  3.0  5.0  5
3  5.0  1  3.0  NaN  5


In [4]:
#fill missing values with mean
df.fillna(df.mean(), inplace=True)
print(df.head())

          A  B    C    D  E
0  1.000000  2  4.0  4.0  5
1  2.666667  2  6.0  3.0  7
2  2.000000  2  3.0  5.0  5
3  5.000000  1  3.0  4.0  5


In [5]:
#transform column A to be a number that falls between 0 and 1
df['A'] = df['A'].apply(lambda x: (x - df['A'].min()) / (df['A'].max() - df['A'].min()))
print(df.head())

          A  B    C    D  E
0  0.000000  2  4.0  4.0  5
1  0.416667  2  6.0  3.0  7
2  0.250000  2  3.0  5.0  5
3  1.000000  1  3.0  4.0  5


In [6]:
# define paths to data files
crash_data_file = "traffic_crashes.csv" 
vehicle_crash_data_file = "traffic_crash_vehicle.csv" 

In [7]:
# import data as dataframes
df_crashes = pd.read_csv(f"data/{crash_data_file}") 
df_vehicles= pd.read_csv(f"data/{vehicle_crash_data_file}")

### Preliminary Tasks: The Importance of Staging Data  

In [8]:
df_crashes.head()

Unnamed: 0,crash_record_id,rd_no,crash_date_est_i,crash_date,posted_speed_limit,traffic_control_device,device_condition,weather_condition,lighting_condition,first_crash_type,...,injuries_non_incapacitating,injuries_reported_not_evident,injuries_no_indication,injuries_unknown,crash_hour,crash_day_of_week,crash_month,latitude,longitude,location
0,530411c8611eb0ccb9b25f16b2955cd21761fa1928dcaa...,JE494048,,2021-12-31T14:00:00.000,35,NO CONTROLS,NO CONTROLS,CLEAR,DAYLIGHT,TURNING,...,0.0,0.0,2.0,0.0,14,6,12,41.79485,-87.76728,POINT (-87.767280356289 41.794849958048)
1,305b06235b250aa0029c07313c84f969f4bc13c1cc3715...,JE494008,,2021-12-31T14:00:00.000,30,TRAFFIC SIGNAL,UNKNOWN,CLEAR,DUSK,TURNING,...,0.0,0.0,2.0,0.0,14,6,12,41.881271,-87.686536,POINT (-87.686535940171 41.881270504288)
2,444221c2a9bc82fc4f301062ab22b482d7d661cf88fcdf...,JE494016,Y,2021-12-31T13:56:00.000,10,OTHER,NO CONTROLS,CLEAR,DAYLIGHT,SIDESWIPE SAME DIRECTION,...,0.0,0.0,2.0,0.0,13,6,12,41.722941,-87.662863,POINT (-87.662862871273 41.72294121821)
3,4603435fbb4ef5d45c0d805c3e9aa5558a311a140a737e...,JE494049,,2021-12-31T13:46:00.000,30,NO CONTROLS,NO CONTROLS,RAIN,DAYLIGHT,PEDALCYCLIST,...,1.0,0.0,2.0,0.0,13,6,12,41.766336,-87.57827,POINT (-87.578269718478 41.766335621716)
4,db62bb4534d0dae57112ea3ff8d50193784aaa732ed58d...,JE494000,,2021-12-31T13:45:00.000,30,TRAFFIC SIGNAL,FUNCTIONING PROPERLY,CLEAR,DAYLIGHT,SIDESWIPE SAME DIRECTION,...,0.0,0.0,2.0,0.0,13,6,12,41.75115,-87.607802,POINT (-87.607802036151 41.7511501753)


In [9]:
df_crashes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 49 columns):
 #   Column                         Non-Null Count  Dtype  
---  ------                         --------------  -----  
 0   crash_record_id                1000 non-null   object 
 1   rd_no                          1000 non-null   object 
 2   crash_date_est_i               69 non-null     object 
 3   crash_date                     1000 non-null   object 
 4   posted_speed_limit             1000 non-null   int64  
 5   traffic_control_device         1000 non-null   object 
 6   device_condition               1000 non-null   object 
 7   weather_condition              1000 non-null   object 
 8   lighting_condition             1000 non-null   object 
 9   first_crash_type               1000 non-null   object 
 10  trafficway_type                1000 non-null   object 
 11  lane_cnt                       1 non-null      float64
 12  alignment                      1000 non-null   ob

In [10]:
#add a new column to df_crashes called "test" with data type of string but no values
df_crashes['test'] = None

#show missing data values per column
df_crashes.isnull().sum()

crash_record_id                     0
rd_no                               0
crash_date_est_i                  931
crash_date                          0
posted_speed_limit                  0
traffic_control_device              0
device_condition                    0
weather_condition                   0
lighting_condition                  0
first_crash_type                    0
trafficway_type                     0
lane_cnt                          999
alignment                           0
roadway_surface_cond                0
road_defect                         0
report_type                        24
crash_type                          0
intersection_related_i            729
private_property_i                955
hit_and_run_i                     680
damage                              0
date_police_notified                0
prim_contributory_cause             0
sec_contributory_cause              0
street_no                           0
street_direction                    0
street_name 

In [11]:
#remove columns with missing values for every row
df_crashes.dropna(axis='columns', how='all', inplace=True)

In [12]:
#remove rows with only 2 populated columns or less
df_crashes = df_crashes.dropna(axis='index', thresh=2, inplace=False) 

In [13]:
df_crashes.isnull().sum() #notice that now the 'test' column has been removed since it had no values

crash_record_id                    0
rd_no                              0
crash_date_est_i                 931
crash_date                         0
posted_speed_limit                 0
traffic_control_device             0
device_condition                   0
weather_condition                  0
lighting_condition                 0
first_crash_type                   0
trafficway_type                    0
lane_cnt                         999
alignment                          0
roadway_surface_cond               0
road_defect                        0
report_type                       24
crash_type                         0
intersection_related_i           729
private_property_i               955
hit_and_run_i                    680
damage                             0
date_police_notified               0
prim_contributory_cause            0
sec_contributory_cause             0
street_no                          0
street_direction                   0
street_name                        0
b

#### Working with Missing Data

In [14]:
# This column has only two values.  
df_crashes['report_type'].unique()  # ['ON SCENE', 'NOT ON SCENE (DESK REPORT)'] 

array(['ON SCENE', 'NOT ON SCENE (DESK REPORT)', nan], dtype=object)

In [15]:
#show a count of each record grouped by report_type column
df_crashes['report_type'].value_counts()

report_type
NOT ON SCENE (DESK REPORT)    499
ON SCENE                      477
Name: count, dtype: int64

In [16]:
# Let’s fill the nulls with ‘ON SCENE’ as below -  
df_crashes  = df_crashes.fillna(value={'report_type': 'ON SCENE'}) 

In [17]:
#show a count of each record grouped by report_type column
df_crashes['report_type'].value_counts()

report_type
ON SCENE                      501
NOT ON SCENE (DESK REPORT)    499
Name: count, dtype: int64

In [18]:
df_vehicles.head()

Unnamed: 0,crash_unit_id,crash_record_id,rd_no,crash_date,unit_no,unit_type,num_passengers,vehicle_id,cmrc_veh_i,make,...,trailer1_length,trailer2_length,total_vehicle_length,axle_cnt,vehicle_config,cargo_body_type,load_type,hazmat_out_of_service_i,mcs_out_of_service_i,hazmat_class
0,1255349,305b06235b250aa0029c07313c84f969f4bc13c1cc3715...,JE494008,2021-12-31T14:00:00.000,1,DRIVER,,1191605.0,,DODGE,...,,,,,,,,,,
1,1255350,305b06235b250aa0029c07313c84f969f4bc13c1cc3715...,JE494008,2021-12-31T14:00:00.000,2,DRIVER,,1191608.0,,KIA,...,,,,,,,,,,
2,1255351,305b06235b250aa0029c07313c84f969f4bc13c1cc3715...,JE494008,2021-12-31T14:00:00.000,3,PARKED,,1191614.0,,CHRYSLER,...,,,,,,,,,,
3,1255367,530411c8611eb0ccb9b25f16b2955cd21761fa1928dcaa...,JE494048,2021-12-31T14:00:00.000,1,DRIVER,,1191620.0,,TOYOTA,...,,,,,,,,,,
4,1255368,530411c8611eb0ccb9b25f16b2955cd21761fa1928dcaa...,JE494048,2021-12-31T14:00:00.000,2,DRIVER,,1191622.0,,GENERAL MOTORS CORPORATION (GMC),...,,,,,,,,,,


In [19]:
#delete first 500 rows of df_vehicles
# df_vehicles = df_vehicles.iloc[500:]
df_vehicles= pd.read_csv(f"data/{vehicle_crash_data_file}") # reset the df_vehicles dataframe

In [20]:
df_vehicles.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 72 columns):
 #   Column                    Non-Null Count  Dtype  
---  ------                    --------------  -----  
 0   crash_unit_id             1000 non-null   int64  
 1   crash_record_id           1000 non-null   object 
 2   rd_no                     1000 non-null   object 
 3   crash_date                1000 non-null   object 
 4   unit_no                   1000 non-null   int64  
 5   unit_type                 999 non-null    object 
 6   num_passengers            151 non-null    float64
 7   vehicle_id                981 non-null    float64
 8   cmrc_veh_i                16 non-null     object 
 9   make                      981 non-null    object 
 10  model                     981 non-null    object 
 11  lic_plate_state           881 non-null    object 
 12  vehicle_year              834 non-null    float64
 13  vehicle_defect            981 non-null    object 
 14  vehicle_t

#### Merging Data

In [21]:
# Merge/Join crashes and vehicles dataframes on the crash_record_id column
df = df_crashes.merge(df_vehicles, how = 'left',on='crash_record_id',suffixes=('_CRASHES', '_VEHICLES')) # this is a left join where df_crashes is the left/main dataframe and df_vehicles gets joined if it has a matching crash_record_id


In [22]:
print(df.shape)
df.head()

(1510, 119)


Unnamed: 0,crash_record_id,rd_no_CRASHES,crash_date_est_i,crash_date_CRASHES,posted_speed_limit,traffic_control_device,device_condition,weather_condition,lighting_condition,first_crash_type,...,trailer1_length,trailer2_length,total_vehicle_length,axle_cnt,vehicle_config,cargo_body_type,load_type,hazmat_out_of_service_i,mcs_out_of_service_i,hazmat_class
0,530411c8611eb0ccb9b25f16b2955cd21761fa1928dcaa...,JE494048,,2021-12-31T14:00:00.000,35,NO CONTROLS,NO CONTROLS,CLEAR,DAYLIGHT,TURNING,...,,,,,,,,,,
1,530411c8611eb0ccb9b25f16b2955cd21761fa1928dcaa...,JE494048,,2021-12-31T14:00:00.000,35,NO CONTROLS,NO CONTROLS,CLEAR,DAYLIGHT,TURNING,...,,,,,,,,,,
2,305b06235b250aa0029c07313c84f969f4bc13c1cc3715...,JE494008,,2021-12-31T14:00:00.000,30,TRAFFIC SIGNAL,UNKNOWN,CLEAR,DUSK,TURNING,...,,,,,,,,,,
3,305b06235b250aa0029c07313c84f969f4bc13c1cc3715...,JE494008,,2021-12-31T14:00:00.000,30,TRAFFIC SIGNAL,UNKNOWN,CLEAR,DUSK,TURNING,...,,,,,,,,,,
4,305b06235b250aa0029c07313c84f969f4bc13c1cc3715...,JE494008,,2021-12-31T14:00:00.000,30,TRAFFIC SIGNAL,UNKNOWN,CLEAR,DUSK,TURNING,...,,,,,,,,,,


In [27]:
#list any crashes in df with no vehicle_id
df[df['vehicle_id'].isnull()]

Unnamed: 0,crash_record_id,rd_no_CRASHES,crash_date_est_i,crash_date_CRASHES,posted_speed_limit,traffic_control_device,device_condition,weather_condition,lighting_condition,first_crash_type,...,trailer1_length,trailer2_length,total_vehicle_length,axle_cnt,vehicle_config,cargo_body_type,load_type,hazmat_out_of_service_i,mcs_out_of_service_i,hazmat_class
7,4603435fbb4ef5d45c0d805c3e9aa5558a311a140a737e...,JE494049,,2021-12-31T13:46:00.000,30,NO CONTROLS,NO CONTROLS,RAIN,DAYLIGHT,PEDALCYCLIST,...,,,,,,,,,,
38,62acc5daf51d9f8af2f3e70098904e0a241fdd024b4637...,JE493932,,2021-12-31T12:47:00.000,30,NO CONTROLS,NO CONTROLS,CLOUDY/OVERCAST,DAYLIGHT,PEDESTRIAN,...,,,,,,,,,,
282,115349f24b3342680e086bf68e0b8bfde3379c5645d1f1...,JE493318,,2021-12-30T19:20:00.000,30,TRAFFIC SIGNAL,FUNCTIONING PROPERLY,FOG/SMOKE/HAZE,"DARKNESS, LIGHTED ROAD",PEDESTRIAN,...,,,,,,,,,,
315,a4ab86f88f61ac0f1d58e2f7ed730ec2c832a49c278aea...,JE493455,Y,2021-12-30T18:00:00.000,30,TRAFFIC SIGNAL,UNKNOWN,UNKNOWN,"DARKNESS, LIGHTED ROAD",PEDESTRIAN,...,,,,,,,,,,
324,e325a3584145f2b533843ee1205252fb421ffd2d6ae4bc...,JE493230,,2021-12-30T17:35:00.000,30,TRAFFIC SIGNAL,FUNCTIONING PROPERLY,CLOUDY/OVERCAST,"DARKNESS, LIGHTED ROAD",PEDESTRIAN,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1505,e18f871e1a58bc6029bb06ebc2970f0a3daa26878da737...,JE490031,,2021-12-27T03:00:00.000,30,NO CONTROLS,NO CONTROLS,CLEAR,DARKNESS,PARKED MOTOR VEHICLE,...,,,,,,,,,,
1506,3ba14e3e148a09331ede83eae1cd04ae8015c7d7317933...,JE489751,,2021-12-27T02:43:00.000,30,NO CONTROLS,NO CONTROLS,RAIN,"DARKNESS, LIGHTED ROAD",PARKED MOTOR VEHICLE,...,,,,,,,,,,
1507,c279f1b96a29c26d65381deb667760cbd21955dbf15a3b...,JE489753,,2021-12-27T02:35:00.000,30,NO CONTROLS,NO CONTROLS,RAIN,"DARKNESS, LIGHTED ROAD",HEAD ON,...,,,,,,,,,,
1508,c2920855959a0363eb73fe97e691c33d7d4a1282e5ca3c...,JE489738,,2021-12-27T02:05:00.000,30,TRAFFIC SIGNAL,FUNCTIONING PROPERLY,RAIN,"DARKNESS, LIGHTED ROAD",TURNING,...,,,,,,,,,,


In [26]:
#Since crash_record_id represents a unique car crash, where multiple cars could have been involved, 
# group the df by crash_record_id and count the number of vehicle_ids in each group, 
# then reset the index and rename the column that shows the count of the number of vehicle_ids to "NumberOfVehiclesInvolved"
df_crashes_num_involved = df.groupby('crash_record_id').agg({'vehicle_id': 'count'}).reset_index().rename(columns={'vehicle_id': 'NumberOfVehiclesInvolved'})
df_crashes_num_involved.head()
#then group the df_agg by number of vehicles involved and count the number of crash_record_ids in each group, and rename the column that shows the count of the number of crash_record_ids to "NumberOfCrashes"
df_count_by_num_involved = df_crashes_num_involved.groupby('NumberOfVehiclesInvolved').agg({'crash_record_id': 'count'}).reset_index().rename(columns={'crash_record_id': 'NumberOfCrashes'})
df_count_by_num_involved.head()



Unnamed: 0,NumberOfVehiclesInvolved,NumberOfCrashes
0,0,510
1,1,57
2,2,393
3,3,28
4,4,6


In [30]:
#group the df by vehicle_type and count the number of crash_record_ids in each group, then reset the index
df_count_by_vehicle_type = df.groupby('vehicle_type').agg({'crash_record_id': 'count'}).reset_index()
print(df_count_by_vehicle_type)

                      vehicle_type  crash_record_id
0                BUS OVER 15 PASS.                5
1       MOPED OR MOTORIZED BICYCLE                1
2                            OTHER               20
3       OTHER VEHICLE WITH TRAILER                1
4                        PASSENGER              633
5                           PICKUP               33
6   SINGLE UNIT TRUCK WITH TRAILER                2
7      SPORT UTILITY VEHICLE (SUV)              138
8          TRACTOR W/ SEMI-TRAILER                5
9         TRACTOR W/O SEMI-TRAILER                2
10             TRUCK - SINGLE UNIT               14
11                      UNKNOWN/NA               89
12                    VAN/MINI-VAN               38


In [31]:
#get the total count of crashes for 'vehicle_type' == 'PASSENGER'
number_of_passenger_cars_involved = df_count_by_vehicle_type[df_count_by_vehicle_type['vehicle_type'] == 'PASSENGER']['crash_record_id'].array[0]
print(number_of_passenger_cars_involved)

633


In [32]:
df.head()
pd.set_option('display.max_rows', None)  # Be cautious with this
#display the crash_record_id column, posted speed limit, and vehicle_type columns for the first 1500 records, order by vehicle_type
df[['crash_record_id', 'posted_speed_limit', 'vehicle_type']].sort_values(by='vehicle_type').head(1500)

Unnamed: 0,crash_record_id,posted_speed_limit,vehicle_type
428,8475fc0426a2e02d9723a791827792fe47cbba3a9edf25...,35,BUS OVER 15 PASS.
63,bfb452d8bde44f56587b494a3291dae40a47c5c64eb3d2...,30,BUS OVER 15 PASS.
825,b81bc04444aa4a3f8494ae01d0f8ce01d7efb4a7e39da4...,30,BUS OVER 15 PASS.
341,fdc8973e4d39a72c85ff78fe872ba0aeaefb4facb71721...,30,BUS OVER 15 PASS.
820,b9016fb256856ae29b86353493b4d5aea8166d51f72fb6...,25,BUS OVER 15 PASS.
43,d6b30daf6d409e36503555e11c38be88be9c6a21913386...,30,MOPED OR MOTORIZED BICYCLE
132,181a4321bb2d6492518b87b2d483bef22d8d9f7795e4ea...,30,OTHER
527,edf43093e857b31983beab22908a26bdf3e2459bc4aa29...,5,OTHER
246,85cb50ee4a616ea80685969d0dc9e228eb22ae3462c576...,30,OTHER
513,0420398ed44c54180dc1c6a36131b0b64be67c1377511f...,30,OTHER


In [33]:
df.info()
#show count of records in df where vehicle_type is null
df[df['vehicle_type'].isnull()].shape

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1510 entries, 0 to 1509
Columns: 119 entries, crash_record_id to hazmat_class
dtypes: float64(35), int64(7), object(77)
memory usage: 1.4+ MB


(529, 119)

In [34]:
df_agg = df.groupby('vehicle_type').agg({'crash_record_id': 'count'}).reset_index()
df_agg

Unnamed: 0,vehicle_type,crash_record_id
0,BUS OVER 15 PASS.,5
1,MOPED OR MOTORIZED BICYCLE,1
2,OTHER,20
3,OTHER VEHICLE WITH TRAILER,1
4,PASSENGER,633
5,PICKUP,33
6,SINGLE UNIT TRUCK WITH TRAILER,2
7,SPORT UTILITY VEHICLE (SUV),138
8,TRACTOR W/ SEMI-TRAILER,5
9,TRACTOR W/O SEMI-TRAILER,2


In [35]:
number_of_passenger_cars_involved = df_agg[df_agg['vehicle_type'] == 'PASSENGER']['crash_record_id'].array[0] 
number_of_passenger_cars_involved

633

#### Data Mapping with Target Data 

In [41]:

# rename columns for data output ...   #the vehicle_type column in pandas will map to the vehicletypes column in target db table via a dictionary
mapping_dict = {
    'vehicle_type':'vehicletypes',
    'crash_record_id':'crashrecordid'
}

print(mapping_dict)


{'vehicle_type': 'vehicletypes', 'crash_record_id': 'crashrecordid'}


In [42]:

df_agg = df_agg.rename(columns=mapping_dict)

In [43]:
df_agg

Unnamed: 0,vehicletypes,crashrecordid
0,BUS OVER 15 PASS.,5
1,MOPED OR MOTORIZED BICYCLE,1
2,OTHER,20
3,OTHER VEHICLE WITH TRAILER,1
4,PASSENGER,633
5,PICKUP,33
6,SINGLE UNIT TRUCK WITH TRAILER,2
7,SPORT UTILITY VEHICLE (SUV),138
8,TRACTOR W/ SEMI-TRAILER,5
9,TRACTOR W/O SEMI-TRAILER,2


### Writing Transformation Functions

In [44]:
def get_transformed_data(crash_file, vehicle_file): 

    # import data
    df_crashes = pd.read_csv(f"data/{crash_file}") 
    df_vehicles= pd.read_csv(f"data/{vehicle_file}") 
    
    # remove specified missing values
    under_threshold_removed = df_crashes.dropna(axis='index', thresh=2, inplace=False) 
    under_threshold_rows = df_crashes[~df_crashes.index.isin(under_threshold_removed.index)] 
    df_crashes.fillna(value={'report_type': 'ON SCENE'}, inplace=True) 
    
    # merge crashes and vehicles
    df = df_crashes.merge(df_vehicles, how='left', on='crash_record_id', suffixes=('_left','_right')) 
    df_agg = df.groupby('vehicle_type').agg({'crash_record_id': 'count'}).reset_index() 
    
    # transform column names for output data
    vehicle_mapping = {'vehicle_type':'vehicletypes'}  
    df_agg = df_agg.rename(columns=vehicle_mapping)

    return df_agg

In [45]:
get_transformed_data(crash_data_file,vehicle_crash_data_file) 

Unnamed: 0,vehicletypes,crash_record_id
0,BUS OVER 15 PASS.,5
1,MOPED OR MOTORIZED BICYCLE,1
2,OTHER,20
3,OTHER VEHICLE WITH TRAILER,1
4,PASSENGER,633
5,PICKUP,33
6,SINGLE UNIT TRUCK WITH TRAILER,2
7,SPORT UTILITY VEHICLE (SUV),138
8,TRACTOR W/ SEMI-TRAILER,5
9,TRACTOR W/O SEMI-TRAILER,2


### Running the Workflow

#### The preceding code can be split into reusable functions that are easy to manage

In [46]:
# Read data from data source  
def read_datasources(source_name): 
    df = pd.read_csv(f"data/{source_name}")  
    return df

In [47]:
# Drop rows with null values 
def drop_rows_with_null_values(df): 
    under_threshold_removed = df.dropna(axis='index', thresh=2, inplace=False)  
    df = df[~df.index.isin(under_threshold_removed.index)]  
    return df 

In [48]:
# Fill missing values 
def fill_missing_values(df): 
    df = df.fillna(value={'report_type': 'ON SCENE'})  
    return df

In [49]:
# Merge Dataframes 
def merge_dataframes(df_vehicles,df_crashes): 
    df = df_crashes.merge(df_vehicles,how='left', on='crash_record_id', suffixes=('_left', '_right'))  
    return df 

In [50]:
# Rename Columns
def rename_columns(df):
    vehicle_mapping = {'vehicle_type' :  'vehicletypes'}  
    df = df.rename(columns=vehicle_mapping)
    return df

#### Define the Pipeline Functions to run the Cleansing and Transformation Functions

In [51]:
def read_data_pipeline(crash_file, vehicle_file): 
    df_crash = pd.DataFrame() 
    df_vehicle_crash = pd.DataFrame() 
    try: 
        df_crash = read_datasources(crash_file) 
        df_vehicle = read_datasources(vehicle_file) 
    except Exception as e: 
        logging.info("Exception in reading data pipeline") 
    finally: 
        return df_crash, df_vehicle   

In [52]:
def drop_rows_with_null_values_pipeline(df_crash, df_vehicle): 
    try: 
        df_crash = drop_rows_with_null_values(df_crash) 
        df_vehicle = drop_rows_with_null_values(df_vehicle) 
    except Exception as e: 
        logging.info("Exception in dropping rows with null value data pipeline") 
 
    finally: 
        return df_crash, df_vehicle

In [53]:
def fill_missing_values_pipeline(df_crash, df_vehicle): 
    try: 
        df_crash = fill_missing_values(df_crash) 
        df_vehicle_crash = fill_missing_values(df_vehicle) 
    except Exception as e: 
        logging.info("Exception in filling missing value pipeline") 
 
    finally: 
        return df_crash, df_vehicle

In [54]:
def merge_dataframes_pipeline(df_crash, df_vehicle): 
    try: 
        df_agg = merge_dataframes(df_vehicles,df_crashes)
    except Exception as e: 
        logging.info("Exception in merge dataframes pipeline") 
 
    finally: 
        return df_agg

In [55]:
def format_dataframes_pipeline(df_agg): 
    try: 
        df_output = rename_columns(df_agg)
    except Exception as e: 
        logging.info("Exception in renaming dataframe columns pipeline") 
 
    finally: 
        return df_output

#### Use the Chigaco Traffic Data and Run the Pipeline Workflow

In [56]:
# Define input data 
crash_data_file = "traffic_crashes.csv" 
vehicle_crash_data_file = "traffic_crash_vehicle.csv" 

# Read Data Pipeline
df_crash, df_vehicle = read_data_pipeline("traffic_crashes.csv", "traffic_crash_vehicle.csv")

# Drop Nulls
df_crash, df_vehicle = drop_rows_with_null_values_pipeline(df_crash, df_vehicle) 

# Fill in Missing Values
df_crash, df_vehicle = fill_missing_values_pipeline(df_crash, df_vehicle) 

# Merge Dataframes
df_agg = merge_dataframes_pipeline(df_crash, df_vehicle)

# Merge Dataframes
df_output = format_dataframes_pipeline(df_agg)

In [57]:
df_output.head()

Unnamed: 0,crash_record_id,rd_no_left,crash_date_est_i,crash_date_left,posted_speed_limit,traffic_control_device,device_condition,weather_condition,lighting_condition,first_crash_type,...,trailer1_length,trailer2_length,total_vehicle_length,axle_cnt,vehicle_config,cargo_body_type,load_type,hazmat_out_of_service_i,mcs_out_of_service_i,hazmat_class
0,530411c8611eb0ccb9b25f16b2955cd21761fa1928dcaa...,JE494048,,2021-12-31T14:00:00.000,35,NO CONTROLS,NO CONTROLS,CLEAR,DAYLIGHT,TURNING,...,,,,,,,,,,
1,530411c8611eb0ccb9b25f16b2955cd21761fa1928dcaa...,JE494048,,2021-12-31T14:00:00.000,35,NO CONTROLS,NO CONTROLS,CLEAR,DAYLIGHT,TURNING,...,,,,,,,,,,
2,305b06235b250aa0029c07313c84f969f4bc13c1cc3715...,JE494008,,2021-12-31T14:00:00.000,30,TRAFFIC SIGNAL,UNKNOWN,CLEAR,DUSK,TURNING,...,,,,,,,,,,
3,305b06235b250aa0029c07313c84f969f4bc13c1cc3715...,JE494008,,2021-12-31T14:00:00.000,30,TRAFFIC SIGNAL,UNKNOWN,CLEAR,DUSK,TURNING,...,,,,,,,,,,
4,305b06235b250aa0029c07313c84f969f4bc13c1cc3715...,JE494008,,2021-12-31T14:00:00.000,30,TRAFFIC SIGNAL,UNKNOWN,CLEAR,DUSK,TURNING,...,,,,,,,,,,


### Transformation Activities in Python 

In [58]:
READING_CRASH_DATA_PIPELINE = "<NOT_EXECUTED>" 
DROPPING_ROW_WITH_NULL_PIPELINE = "<NOT_EXECUTED>" 
FILLING_MISSING_VALUE_PIPELINE = "<NOT_EXECUTED>" 
MERGE_DATAFRAME_PIPELINE = "<NOT_EXECUTED>" 

In [59]:
df_crash, df_vehicle = read_data_pipeline("traffic_crashes.csv", "traffic_crash_vehicle.csv") 
 
if READING_CRASH_DATA_PIPELINE == "<OK>": 
    df_crash, df_vehicle = drop_rows_with_null_values_pipeline(df_crash, df_vehicle) 
 
elif DROPPING_ROW_WITH_NULL_PIPELINE == "<OK>": 
    df_crash, df_vehicle= fill_missing_values_pipeline(df_crash, df_vehicle) 
 
elif FILLING_MISSING_VALUE_PIPELINE == "<OK>": 
    df_crash, df_vehicle = merge_dataframes_pipeline(df_crash, df_vehicle) 

In [64]:
#get the max value of df_agg for the column called "crash_record_id"
max_vehicle_year = df_agg['vehicle_year'].max()
#conver the max to an int dtype
max_vehicle_year = int(max_vehicle_year)
print(max_vehicle_year)

2099
