## CASE STUDY

In [5]:
# importing required packages
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, count, col, dense_rank
from pyspark.sql.window import Window
import filepaths

In [6]:
# creating spark session(spark entry point)
spark = SparkSession.builder.appName('CaseStudySession').getOrCreate()

In [13]:
# Method to read CSV data in spark dataframe
def read_csv_df(path='', header=True, inferSchema=True):
    """Method to read CSV file and return as a spark dataframe
       This function will go through the input once to determine the input schema if
       'inferSchema' is enabled. To avoid going through the entire data once, disable
       
    Parameters:
        path: string for input path.
        inferSchema: bool, optional
            infers the input schema automatically from data. It requires one extra
            pass over the data. It uses the default value, 'True'.
        header: bool, optional
            uses the first line as names of columns. It uses the default value, 'True'.
    """
    if path:
        return spark.read.csv(path, header=header, inferSchema=inferSchema).dropDuplicates()
    raise Exception('Please provide file path')

# Method to write CSV data from spark dataframe
def write_csv_df(df=None, path='', header=True):
    """Method to write spark dataframe as CSV file
    
    Parameters:
        df: spark dataframe which needs to written as file.
        path: string for output path.
        header: bool, optional
            writes first line as names of columns. It uses the default value, 'True'.
    """

    if path and df:
        df.write.csv(path, header=header)
        return
    raise Exception('Please check file path and dataframe')

## Reading Primary Person Data

In [8]:
# Reading Primary Person CSV Data in a spark dataframe
primary_person_df = read_csv_df(filepaths.PRIMARY_PERSON_DATA_PATH, header=True, inferSchema=True)

# Inferred Schema of primary person dataframe
primary_person_df.printSchema()

root
 |-- CRASH_ID: integer (nullable = true)
 |-- UNIT_NBR: integer (nullable = true)
 |-- PRSN_NBR: integer (nullable = true)
 |-- PRSN_TYPE_ID: string (nullable = true)
 |-- PRSN_OCCPNT_POS_ID: string (nullable = true)
 |-- PRSN_INJRY_SEV_ID: string (nullable = true)
 |-- PRSN_AGE: string (nullable = true)
 |-- PRSN_ETHNICITY_ID: string (nullable = true)
 |-- PRSN_GNDR_ID: string (nullable = true)
 |-- PRSN_EJCT_ID: string (nullable = true)
 |-- PRSN_REST_ID: string (nullable = true)
 |-- PRSN_AIRBAG_ID: string (nullable = true)
 |-- PRSN_HELMET_ID: string (nullable = true)
 |-- PRSN_SOL_FL: string (nullable = true)
 |-- PRSN_ALC_SPEC_TYPE_ID: string (nullable = true)
 |-- PRSN_ALC_RSLT_ID: string (nullable = true)
 |-- PRSN_BAC_TEST_RSLT: string (nullable = true)
 |-- PRSN_DRG_SPEC_TYPE_ID: string (nullable = true)
 |-- PRSN_DRG_RSLT_ID: string (nullable = true)
 |-- DRVR_DRG_CAT_1_ID: string (nullable = true)
 |-- PRSN_DEATH_TIME: string (nullable = true)
 |-- INCAP_INJRY_CNT: int

## Reading Units Data

In [9]:
# Reading Primary Person CSV Data in a spark dataframe
units_df = read_csv_df(filepaths.UNITS_DATA_PATH, header=True, inferSchema=True)

# Inferred Schema of primary person dataframe
units_df.printSchema()

root
 |-- CRASH_ID: integer (nullable = true)
 |-- UNIT_NBR: integer (nullable = true)
 |-- UNIT_DESC_ID: string (nullable = true)
 |-- VEH_PARKED_FL: string (nullable = true)
 |-- VEH_HNR_FL: string (nullable = true)
 |-- VEH_LIC_STATE_ID: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- VEH_MOD_YEAR: string (nullable = true)
 |-- VEH_COLOR_ID: string (nullable = true)
 |-- VEH_MAKE_ID: string (nullable = true)
 |-- VEH_MOD_ID: string (nullable = true)
 |-- VEH_BODY_STYL_ID: string (nullable = true)
 |-- EMER_RESPNDR_FL: string (nullable = true)
 |-- OWNR_ZIP: string (nullable = true)
 |-- FIN_RESP_PROOF_ID: string (nullable = true)
 |-- FIN_RESP_TYPE_ID: string (nullable = true)
 |-- VEH_DMAG_AREA_1_ID: string (nullable = true)
 |-- VEH_DMAG_SCL_1_ID: string (nullable = true)
 |-- FORCE_DIR_1_ID: string (nullable = true)
 |-- VEH_DMAG_AREA_2_ID: string (nullable = true)
 |-- VEH_DMAG_SCL_2_ID: string (nullable = true)
 |-- FORCE_DIR_2_ID: string (nullable = true)
 |--

## Reading Charges Data

In [10]:
# Reading Primary Person CSV Data in a spark dataframe
charges_df = read_csv_df(filepaths.CHARGES_DATA_PATH, header=True, inferSchema=True)

# Inferred Schema of primary person dataframe
charges_df.printSchema()

root
 |-- CRASH_ID: integer (nullable = true)
 |-- UNIT_NBR: integer (nullable = true)
 |-- PRSN_NBR: integer (nullable = true)
 |-- CHARGE: string (nullable = true)
 |-- CITATION_NBR: string (nullable = true)



## Reading Damages Data

In [11]:
damages_csv_path = './Data/Damages_use.csv'

# Reading Primary Person CSV Data in a spark dataframe
damages_df = read_csv_df(filepaths.DAMAGES_DATA_PATH, header=True, inferSchema=True)

# Inferred Schema of primary person dataframe
damages_df.printSchema()

root
 |-- CRASH_ID: integer (nullable = true)
 |-- DAMAGED_PROPERTY: string (nullable = true)



## Analysis 1: Find the number of crashes (accidents) in which number of persons killed are male?

In [9]:
# Filter accidents in which persons were killed should have PRSN_INJRY_SEV_ID set to 'KILLED'.
# Filter accidents involving male should have PRSN_GNDR_ID set to 'MALE'.
# After applying above two filters, get unique number of crashes/accidents using CRASH_ID.

# primary_person_df.select('PRSN_GNDR_ID').distinct().collect()
# primary_person_df.select('PRSN_INJRY_SEV_ID').distinct().collect()

num_crashes_male = primary_person_df.filter('PRSN_INJRY_SEV_ID="KILLED" AND PRSN_GNDR_ID="MALE"') \
                   .select('CRASH_ID').distinct().count()

print('Number of crashes(accidents) in which number of persons killed are male:', num_crashes_male)

Number of crashes(accidents) in which number of persons killed are male: 180


## Analysis 2: How many two wheelers are booked for crashes? 

In [10]:
# Filtering Units data for two wheelers i.e. 'POLICE MOTORCYCLE' OR 'MOTORCYCLE'
# Getting unique Vehicle body styles => units_df.select('VEH_BODY_STYL_ID').distinct().collect()
# Getting unique VIN(vehicle identification number) after applying the above filter.

two_wheeler_vehicle_body = ['POLICE MOTORCYCLE', 'MOTORCYCLE']

two_wheelers_crashes_df = units_df.filter(col('VEH_BODY_STYL_ID').isin(two_wheeler_vehicle_body)) \
                            .select('VIN').filter(col('VIN').isNotNull())

print('Number of two wheelers which are booked for crashes:', two_wheelers_crashes_df.distinct().count())

Number of two wheelers which are booked for crashes: 766


## Analysis 3: Which state has highest number of accidents in which females are involved? 

In [11]:
# Filter accidents involving FEMALE should have PRSN_GNDR_ID set to 'FEMALE'
# Getting unique vehicle types => primary_person_df.select('PRSN_GNDR_ID').distinct().collect()
# Assuming the state of accident same as driver licence state

accident_female_df = primary_person_df.filter('PRSN_GNDR_ID="FEMALE"')
print('Number of crashes(accidents) in which number of persons are female:', \
          accident_female_df.select(['DRVR_LIC_STATE_ID', 'CRASH_ID']).distinct() \
          .groupby('DRVR_LIC_STATE_ID').count().orderBy(desc('count')).first()[0])

Number of crashes(accidents) in which number of persons are female: Texas


## Analysis 4: Which are the Top 5th to 15th VEH_MAKE_IDs that contribute to a largest number of injuries including death

In [12]:
# List of values pertaining to injury including death
# [row.PRSN_INJRY_SEV_ID for row in primary_person_df.select('PRSN_INJRY_SEV_ID').distinct().collect()]
injury_flag_list = ['KILLED', 'NON-INCAPACITATING INJURY', 'POSSIBLE INJURY', 'INCAPACITATING INJURY']

# Joining Units dataset with rimary person dataset to filter only the cashes involving injuries 
person_units_inner_join_df = primary_person_df.filter(col('PRSN_INJRY_SEV_ID').isin(injury_flag_list)) \
                            .select('CRASH_ID', 'UNIT_NBR') \
                            .join(units_df.select(['CRASH_ID', 'UNIT_NBR', 'VEH_MAKE_ID']), ['CRASH_ID', 'UNIT_NBR'], 'inner')

# Counting unique crashes fo each VEH_MAKE_ID(Eliminating cases where VEH_MAKE_ID is NA)
person_veh_make_df = person_units_inner_join_df.select('CRASH_ID', 'VEH_MAKE_ID').filter('VEH_MAKE_ID!="NA"') \
                    .distinct().groupby('VEH_MAKE_ID').count().orderBy(desc('count'))

# Assiging rank based on the crashes/count of each vehicle make
windowSpec = Window.orderBy(desc('count'))
person_veh_make_df = person_veh_make_df.withColumn('dense_rank', dense_rank().over(windowSpec))

# Filtering top 5th to 15th VEH_MKE_IDs
person_veh_make_df.filter('dense_rank>=5 AND dense_rank<=15').select(['VEH_MAKE_ID']).show(truncate=False)

+-----------+
|VEH_MAKE_ID|
+-----------+
|NISSAN     |
|HONDA      |
|GMC        |
|HYUNDAI    |
|KIA        |
|JEEP       |
|CHRYSLER   |
|MAZDA      |
|PONTIAC    |
|VOLKSWAGEN |
|LEXUS      |
+-----------+



## Analysis 5: For all the body styles involved in crashes, mention the top ethnic user group of each unique body style

In [13]:
# Add logic to only use required columns not all whle joining two big datasets
person_units_join_df = primary_person_df.join(units_df, ['CRASH_ID', 'UNIT_NBR'], 'outer')
# person_units_join_df = primary_person_df.join(units_df, ['CRASH_ID', 'UNIT_NBR'], 'inner')

# person_units_join_df.select(['PRSN_ETHNICITY_ID', 'VEH_BODY_STYL_ID', 'CRASH_ID', 'UNIT_NBR']).coalesce(1).write.csv('./Hello.csv', header=True)
body_ethnic_df = person_units_join_df.select(['PRSN_ETHNICITY_ID', 'VEH_BODY_STYL_ID', 'CRASH_ID']) \
                .filter('VEH_BODY_STYL_ID!="NA" AND VEH_BODY_STYL_ID!="UNKNOWN"').distinct() \
                .groupby(['VEH_BODY_STYL_ID','PRSN_ETHNICITY_ID']) \
                .agg(count('CRASH_ID').alias('crash_count'))

# Using dense rank to find the top ethnicity for each vehicle body styles
windowSpec = Window.partitionBy('VEH_BODY_STYL_ID').orderBy(desc('crash_count'))
body_ethnic_df = body_ethnic_df.withColumn('dense_rank', dense_rank().over(windowSpec))

body_ethnic_df.filter('dense_rank=1').select(['VEH_BODY_STYL_ID','PRSN_ETHNICITY_ID']) \
                .orderBy('VEH_BODY_STYL_ID','PRSN_ETHNICITY_ID').show(truncate=False)



+---------------------------------+-----------------+
|VEH_BODY_STYL_ID                 |PRSN_ETHNICITY_ID|
+---------------------------------+-----------------+
|AMBULANCE                        |WHITE            |
|BUS                              |BLACK            |
|FARM EQUIPMENT                   |WHITE            |
|FIRE TRUCK                       |WHITE            |
|MOTORCYCLE                       |WHITE            |
|NEV-NEIGHBORHOOD ELECTRIC VEHICLE|WHITE            |
|NOT REPORTED                     |WHITE            |
|OTHER  (EXPLAIN IN NARRATIVE)    |WHITE            |
|PASSENGER CAR, 2-DOOR            |WHITE            |
|PASSENGER CAR, 4-DOOR            |WHITE            |
|PICKUP                           |WHITE            |
|POLICE CAR/TRUCK                 |WHITE            |
|POLICE MOTORCYCLE                |WHITE            |
|SPORT UTILITY VEHICLE            |WHITE            |
|TRUCK                            |WHITE            |
|TRUCK TRACTOR              

## Analysis 6: Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash (Use Driver Zip Code)

In [14]:
# Assuming the below vehichle body styles depict cars
car_types = ['PASSENGER CAR, 2-DOOR', 'PASSENGER CAR, 4-DOOR', 'POLICE CAR/TRUCK', 'SPORT UTILITY VEHICLE', 
             'NEV-NEIGHBORHOOD ELECTRIC VEHICLE', 'VAN']

# Filtering for persons with positive alcohol result
# Filtering for vehicles being car
# Ignoring records with driver zip not being null
crashed_car_alcohol_zip_df = person_units_join_df.filter((col('PRSN_ALC_RSLT_ID')=='Positive') \
                            & (col('DRVR_ZIP').isNotNull()) & (col('VEH_BODY_STYL_ID').isin(car_types))) \
                            .select(['CRASH_ID', 'DRVR_ZIP', 'VEH_BODY_STYL_ID']).distinct()

crashed_car_alcohol_zip_df.select(['CRASH_ID', 'DRVR_ZIP']).distinct().groupby('DRVR_ZIP') \
    .agg(count('CRASH_ID').alias('crash_count')).orderBy(desc('crash_count')).limit(5).show()

+--------+-----------+
|DRVR_ZIP|crash_count|
+--------+-----------+
|   78521|         45|
|   79936|         35|
|   76010|         35|
|   79938|         31|
|   78741|         27|
+--------+-----------+



## Analysis 7: Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance

In [15]:
# Damage flag which depict damage level above 4
damage_flag = ['DAMAGED 5', 'DAMAGED 6', 'DAMAGED 7 HIGHEST']

# Adding Damages data to Units Data having damage level above 4(Join)
damages_units_join_df = units_df.filter((col('VEH_DMAG_SCL_1_ID').isin(damage_flag)) | (col('VEH_DMAG_SCL_2_ID').isin(damage_flag))) \
                        .join(damages_df, ['CRASH_ID'], 'inner')

# No Damages Flags
no_damage_property_flag = [row.DAMAGED_PROPERTY \
                           for row in list(damages_units_join_df.select('DAMAGED_PROPERTY').distinct().collect()) \
                           if 'NO DAMAGE' in str(row.DAMAGED_PROPERTY).upper()]

# List of unique Financial Responsibility Type
# distinct_fin_res_id = [row.FIN_RESP_TYPE_ID for row in list(units_df.select('FIN_RESP_TYPE_ID').distinct().collect())]
# List of Responsibility type pertaining to insurance and it's type
# insurance_flag = ['INSURANCE BINDER', 'LIABILITY INSURANCE POLICY', 'CERTIFICATE OF SELF-INSURANCE',
#                   'CERTIFICATE OF DEPOSIT WITH COUNTY JUDGE', 'CERTIFICATE OF DEPOSIT WITH COMPTROLLER',
#                   'SURETY BOND', 'PROOF OF LIABILITY INSURANCE']

non_insurance_flag = ['NA']

# Filtering for vehicles which have insurance and the vehicles are of car type
# Filtering for vehicles which didnt damage property
damages_units_join_df.filter((~col('FIN_RESP_TYPE_ID').isin(non_insurance_flag)) \
                & (person_units_join_df.VEH_BODY_STYL_ID.isin(car_types)) \
                & (col('DAMAGED_PROPERTY').isin(no_damage_property_flag)) \
               ).select('CRASH_ID').distinct().count()                                                                                           

2

## Analysis 8: Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, uses top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)

In [16]:
# Speeding related offences flags
speeding_flag = [row.CHARGE for row in list(charges_df.select('CHARGE').distinct().collect()) if 'SPEED' in str(row.CHARGE).upper()]

# offense_flag = [row.CHARGE for row in list(charges_df.select('CHARGE').distinct().collect()) if 'OFFENSE' in str(row.CHARGE).upper()]

# Filtering for car crashes
car_types = ['PASSENGER CAR, 2-DOOR', 'PASSENGER CAR, 4-DOOR', 'POLICE CAR/TRUCK', 'SPORT UTILITY VEHICLE', 
             'NEV-NEIGHBORHOOD ELECTRIC VEHICLE', 'VAN']

# DRIVER LICENSE TYPE
license_type_id = ['COMMERCIAL DRIVER LIC.', 'OCCUPATIONAL', 'DRIVER LICENSE'] 

# List of 25 states with highest car crashes(Assuming crash can be cosidered as an offense)
states_highest_car_crash = [row.VEH_LIC_STATE_ID for row in list(units_df.filter(person_units_join_df.VEH_BODY_STYL_ID.isin(car_types)).select(['CRASH_ID', 'VEH_LIC_STATE_ID']).distinct().groupby('VEH_LIC_STATE_ID').count().orderBy(desc('count')).limit(25).collect())]

# Top 10 used vehicle colours
top_vehicle_colors = [row.VEH_COLOR_ID for row in \
                      list(units_df.select('CRASH_ID', 'VEH_COLOR_ID').filter('VEH_COLOR_ID!="NA"') \
                           .groupby('VEH_COLOR_ID').count().orderBy(desc('count')).select('VEH_COLOR_ID') \
                           .limit(10).collect())]

person_units_join_df.filter((col('VEH_COLOR_ID').isin(top_vehicle_colors)) \
    & (col('VEH_LIC_STATE_ID').isin(states_highest_car_crash)) \
    & (col('DRVR_LIC_TYPE_ID').isin(license_type_id))) \
    .join(charges_df.filter(col('CHARGE').isin(speeding_flag)), ['CRASH_ID', 'UNIT_NBR'], 'inner') \
    .select(['VEH_MAKE_ID', 'CRASH_ID', 'UNIT_NBR']).distinct() \
    .groupby('VEH_MAKE_ID').count().orderBy(desc('count')).select('VEH_MAKE_ID').limit(5).show()


+-----------+
|VEH_MAKE_ID|
+-----------+
|       FORD|
|  CHEVROLET|
|     TOYOTA|
|      DODGE|
|      HONDA|
+-----------+

