In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_unixtime, col, isnan, when, count, to_date, unix_timestamp, date_format, to_timestamp, lit

import pandas as pd

import os

In [2]:
spark = SparkSession.builder.master("local[*]").config("spark.driver.memory", "15g").config("spark.driver.maxResultSize", "8g").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/13 14:13:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Import dataset

In [3]:
raw_data_dir = '../dataset/raw/'
cleaned_data_dir = '../dataset/cleaned/'

incident_report = spark.read.csv(os.path.join(raw_data_dir, 'Traffic_Crashes_Incidents.csv'), sep=',', header=True)
people_info = spark.read.csv(os.path.join(raw_data_dir, 'Traffic_Crashes_People.csv'), sep=',', header=True)

In [4]:
incident_report.show(5)
print(incident_report.columns)

24/08/13 14:13:24 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+----------------+--------------------+------------------+----------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------+------------------+--------------------+-----------+-----------+--------------------+----------------------+------------------+-------------+-----------+--------------------+-----------------------+----------------------+---------+----------------+-----------------+------------------+--------------+------------------+---------+-----------+--------------+-----------------+---------+--------------------+--------------+--------------+-----------------------+---------------------------+-----------------------------+----------------------+----------------+----------+-----------------+-----------+------------+-------------+--------------------+
|     CRASH_RECORD_ID|CRASH_DATE_EST_I|          CRASH_DATE|POSTED_SPEED_LIMIT|TRAFFIC_CONTROL_DEVICE|    DEVICE_CONDITION|WEATHER_CONDITION

In [5]:
people_info.show(5)
print(people_info.columns)

+---------+-----------+--------------------+----------+--------------------+-------+-------+-----+-------+---+----+---------------------+---------------------+----------------+------------------+--------+---------------------+--------+----------+----------+----------------+-------------+------------------+---------------+-------------------+-----------------+----------------+----------------+--------------+
|PERSON_ID|PERSON_TYPE|     CRASH_RECORD_ID|VEHICLE_ID|          CRASH_DATE|SEAT_NO|   CITY|STATE|ZIPCODE|SEX| AGE|DRIVERS_LICENSE_STATE|DRIVERS_LICENSE_CLASS|SAFETY_EQUIPMENT|   AIRBAG_DEPLOYED|EJECTION|INJURY_CLASSIFICATION|HOSPITAL|EMS_AGENCY|EMS_RUN_NO|   DRIVER_ACTION|DRIVER_VISION|PHYSICAL_CONDITION|PEDPEDAL_ACTION|PEDPEDAL_VISIBILITY|PEDPEDAL_LOCATION|      BAC_RESULT|BAC_RESULT VALUE|CELL_PHONE_USE|
+---------+-----------+--------------------+----------+--------------------+-------+-------+-----+-------+---+----+---------------------+---------------------+----------------+--

## Get columns for further plots

In [6]:
incident_report = incident_report.select(['CRASH_RECORD_ID', 'CRASH_DATE', 'POSTED_SPEED_LIMIT', 'TRAFFIC_CONTROL_DEVICE', 'DEVICE_CONDITION', 'WEATHER_CONDITION', 'LIGHTING_CONDITION', 'FIRST_CRASH_TYPE', 'TRAFFICWAY_TYPE', 'LANE_CNT', 'ALIGNMENT', 'ROADWAY_SURFACE_COND', 'ROAD_DEFECT', 'CRASH_TYPE', 'INTERSECTION_RELATED_I', 'NOT_RIGHT_OF_WAY_I', 'HIT_AND_RUN_I', 'PRIM_CONTRIBUTORY_CAUSE', 'SEC_CONTRIBUTORY_CAUSE', 'DOORING_I', 'WORK_ZONE_I', 'WORK_ZONE_TYPE', 'WORKERS_PRESENT_I', 'INJURIES_TOTAL', 'INJURIES_FATAL', 'INJURIES_INCAPACITATING', 'INJURIES_NON_INCAPACITATING', 'INJURIES_REPORTED_NOT_EVIDENT', 'INJURIES_NO_INDICATION', 'INJURIES_UNKNOWN', 'CRASH_HOUR', 'CRASH_DAY_OF_WEEK', 'CRASH_MONTH'])

people_info = people_info.select(['PERSON_ID', 'PERSON_TYPE', 'CRASH_RECORD_ID', 'STATE',  'SEX', 'AGE', 'SAFETY_EQUIPMENT', 'AIRBAG_DEPLOYED', 'EJECTION', 'INJURY_CLASSIFICATION', 'DRIVER_ACTION', 'DRIVER_VISION', 'PHYSICAL_CONDITION', 'PEDPEDAL_ACTION', 'PEDPEDAL_VISIBILITY', 'PEDPEDAL_LOCATION', 'BAC_RESULT', 'BAC_RESULT VALUE', 'CELL_PHONE_USE'])

## Handle nan/null value

replace nan in all columns of `incident_report` and `people_info` dataframes with *UNKNOWN* value

In [7]:
incident_report.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in incident_report.columns]).show()



+---------------+----------+------------------+----------------------+----------------+-----------------+------------------+----------------+---------------+--------+---------+--------------------+-----------+----------+----------------------+------------------+-------------+-----------------------+----------------------+---------+-----------+--------------+-----------------+--------------+--------------+-----------------------+---------------------------+-----------------------------+----------------------+----------------+----------+-----------------+-----------+
|CRASH_RECORD_ID|CRASH_DATE|POSTED_SPEED_LIMIT|TRAFFIC_CONTROL_DEVICE|DEVICE_CONDITION|WEATHER_CONDITION|LIGHTING_CONDITION|FIRST_CRASH_TYPE|TRAFFICWAY_TYPE|LANE_CNT|ALIGNMENT|ROADWAY_SURFACE_COND|ROAD_DEFECT|CRASH_TYPE|INTERSECTION_RELATED_I|NOT_RIGHT_OF_WAY_I|HIT_AND_RUN_I|PRIM_CONTRIBUTORY_CAUSE|SEC_CONTRIBUTORY_CAUSE|DOORING_I|WORK_ZONE_I|WORK_ZONE_TYPE|WORKERS_PRESENT_I|INJURIES_TOTAL|INJURIES_FATAL|INJURIES_INCAPACITAT

                                                                                

In [8]:
people_info.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in people_info.columns]).show()



+---------+-----------+---------------+------+-----+------+----------------+---------------+--------+---------------------+-------------+-------------+------------------+---------------+-------------------+-----------------+----------+----------------+--------------+
|PERSON_ID|PERSON_TYPE|CRASH_RECORD_ID| STATE|  SEX|   AGE|SAFETY_EQUIPMENT|AIRBAG_DEPLOYED|EJECTION|INJURY_CLASSIFICATION|DRIVER_ACTION|DRIVER_VISION|PHYSICAL_CONDITION|PEDPEDAL_ACTION|PEDPEDAL_VISIBILITY|PEDPEDAL_LOCATION|BAC_RESULT|BAC_RESULT VALUE|CELL_PHONE_USE|
+---------+-----------+---------------+------+-----+------+----------------+---------------+--------+---------------------+-------------+-------------+------------------+---------------+-------------------+-----------------+----------+----------------+--------------+
|        0|          0|              0|494261|31714|551590|            5257|          36771|   23511|                  737|       386076|       386640|            385049|        1854586|          

                                                                                

In [9]:
incident_report = incident_report.fillna(value='UNKNOWN')
people_info = people_info.fillna(value='UNKNOWN')

In [10]:
incident_report.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in incident_report.columns]).show()



+---------------+----------+------------------+----------------------+----------------+-----------------+------------------+----------------+---------------+--------+---------+--------------------+-----------+----------+----------------------+------------------+-------------+-----------------------+----------------------+---------+-----------+--------------+-----------------+--------------+--------------+-----------------------+---------------------------+-----------------------------+----------------------+----------------+----------+-----------------+-----------+
|CRASH_RECORD_ID|CRASH_DATE|POSTED_SPEED_LIMIT|TRAFFIC_CONTROL_DEVICE|DEVICE_CONDITION|WEATHER_CONDITION|LIGHTING_CONDITION|FIRST_CRASH_TYPE|TRAFFICWAY_TYPE|LANE_CNT|ALIGNMENT|ROADWAY_SURFACE_COND|ROAD_DEFECT|CRASH_TYPE|INTERSECTION_RELATED_I|NOT_RIGHT_OF_WAY_I|HIT_AND_RUN_I|PRIM_CONTRIBUTORY_CAUSE|SEC_CONTRIBUTORY_CAUSE|DOORING_I|WORK_ZONE_I|WORK_ZONE_TYPE|WORKERS_PRESENT_I|INJURIES_TOTAL|INJURIES_FATAL|INJURIES_INCAPACITAT

                                                                                

In [11]:
people_info.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in people_info.columns]).show()



+---------+-----------+---------------+-----+---+---+----------------+---------------+--------+---------------------+-------------+-------------+------------------+---------------+-------------------+-----------------+----------+----------------+--------------+
|PERSON_ID|PERSON_TYPE|CRASH_RECORD_ID|STATE|SEX|AGE|SAFETY_EQUIPMENT|AIRBAG_DEPLOYED|EJECTION|INJURY_CLASSIFICATION|DRIVER_ACTION|DRIVER_VISION|PHYSICAL_CONDITION|PEDPEDAL_ACTION|PEDPEDAL_VISIBILITY|PEDPEDAL_LOCATION|BAC_RESULT|BAC_RESULT VALUE|CELL_PHONE_USE|
+---------+-----------+---------------+-----+---+---+----------------+---------------+--------+---------------------+-------------+-------------+------------------+---------------+-------------------+-----------------+----------+----------------+--------------+
|        0|          0|              0|    0|  0|  0|               0|              0|       0|                    0|            0|            0|                 0|              0|                  0|              

                                                                                

In [12]:
join_df = incident_report.join(people_info, incident_report.CRASH_RECORD_ID == people_info.CRASH_RECORD_ID, how = 'inner')

print('total rows:', join_df.count())

[Stage 20:>                                                       (0 + 12) / 13]

total rows: 1891040


                                                                                

In [13]:
## drop duplicates
join_df = join_df.drop_duplicates()
print('total rows:', join_df.count())



total rows: 1891040


                                                                                

In [14]:
# join_df = join_df.drop('CRASH_RECORD_ID')

## Save data to csv file

In [15]:
pandas_df = join_df.toPandas()
pandas_df.to_csv(os.path.join(cleaned_data_dir,'cleaned_joined_data.csv'), index=False)

                                                                                

In [18]:
incident_report = incident_report.dropDuplicates()
pandas_df = incident_report.toPandas()
pandas_df.to_csv(os.path.join(cleaned_data_dir,'cleaned_incident_data.csv'), index=False)

                                                                                

In [19]:
people_info = people_info.dropDuplicates()
pandas_df = people_info.toPandas()
pandas_df.to_csv(os.path.join(cleaned_data_dir,'cleaned_person_data.csv'), index=False)

                                                                                