# IMPORT DEPENDENT LIBRARIES

In [40]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import json
import os

pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)

# LOAD CONFIG

In [42]:
with open("./config.json", "r") as json_file:
    config = json.load(json_file)

input_paths = config["input_data"]
input_format = config["data_formats"]["input_format"]

output_paths = config["output_data"]
output_format = config["data_formats"]["output_format"]

# CREATE SPARK-ENTRY POINT

In [43]:
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk"

spark = SparkSession.builder.appName("BCGX_Case_Study").getOrCreate()

# DATA IMPORTS

In [44]:
charges_df = spark.read.format(input_format).option("header", "true").option("inferSchema", "true").load(input_paths['Charges'])
damages_df = spark.read.format(input_format).option("header", "true").option("inferSchema", "true").load(input_paths['Damages'])
endorse_df = spark.read.format(input_format).option("header", "true").option("inferSchema", "true").load(input_paths['Endorse'])
primary_person_df = spark.read.format(input_format).option("header", "true").option("inferSchema", "true").load(input_paths['Primary_Person'])
units_df = spark.read.format(input_format).option("header", "true").option("inferSchema", "true").load(input_paths['Units'])
restrict_df = spark.read.format(input_format).option("header", "true").option("inferSchema", "true").load(input_paths['Restrict'])

# ANALYSIS BEGINS HERE

## ANALYSIS 1


#### Find the number of crashes (accidents) in which number of males killed are greater than 2?

In [45]:
a1_01 = primary_person_df.where(trim(col("PRSN_INJRY_SEV_ID")) == "KILLED").groupBy("CRASH_ID", "PRSN_GNDR_ID").agg(count("*").alias("CRASH_GENDERWISE_FATALITY_COUNT"))
a1_02 = a1_01.where((col("CRASH_GENDERWISE_FATALITY_COUNT") > 2) & (trim(col("PRSN_GNDR_ID")) == "MALE"))
a1_02.count()

0

## ANALYSIS 2

#### How many two wheelers are booked for crashes? 

In [46]:
two_wheelers = ["MOTORCYCLE", "POLICE MOTORCYCLE"]

a2_01 = units_df.where(trim(col("VEH_BODY_STYL_ID")).isin(two_wheelers))
a2_02 = a2_01.select("CRASH_ID", "UNIT_NBR").distinct()
a2_02.count()

773

## ANALYSIS 3

#### Determine the Top 5 Vehicle Makes of the cars present in the crashes in which driver died and Airbags did not deploy.

In [47]:
# get crash-specific info
a3_01 = primary_person_df.where((trim(col("PRSN_TYPE_ID")) == "DRIVER") & (trim(col("PRSN_INJRY_SEV_ID")) == "KILLED") & (trim(col("PRSN_AIRBAG_ID")) == "NOT DEPLOYED"))
a3_02 = a3_01.select("CRASH_ID", "UNIT_NBR").distinct()

# get vehicle-specific info
a3_03 = units_df.select("CRASH_ID", "UNIT_NBR", "VEH_MAKE_ID").distinct()

# bring the two together
a3_04 = a3_02.join(a3_03, on = ["CRASH_ID", "UNIT_NBR"], how = "left")

a3_05 = a3_04.groupBy("VEH_MAKE_ID").agg(count('*').alias("COUNT_REQ_CRASHES_PER_MAKE")).orderBy(col("COUNT_REQ_CRASHES_PER_MAKE").desc())
a3_06 = a3_05.select("VEH_MAKE_ID", "COUNT_REQ_CRASHES_PER_MAKE").limit(5)
a3_06.show()

+-----------+--------------------------+
|VEH_MAKE_ID|COUNT_REQ_CRASHES_PER_MAKE|
+-----------+--------------------------+
|  CHEVROLET|                         8|
|       FORD|                         6|
|     NISSAN|                         5|
|      DODGE|                         2|
|      HONDA|                         2|
+-----------+--------------------------+



## ANALYSIS 4
#### Determine number of Vehicles with driver having valid licences involved in hit and run? 

In [48]:
# required license types
valid_license_types = ["DRIVER LICENSE", "COMMERCIAL DRIVER LIC."]

# get legit licensed driver crash records
a4_01 = primary_person_df.where(trim(col("DRVR_LIC_TYPE_ID")).isin(valid_license_types)).select("CRASH_ID", "UNIT_NBR").distinct()
# get hit and run crash records
a4_02 = units_df.where(trim(col("VEH_HNR_FL")) == "Y").select("CRASH_ID", "UNIT_NBR").distinct()

# bring the two together
a4_03 = a4_01.join(a4_02, on = ["CRASH_ID", "UNIT_NBR"], how = "inner")
a4_03.count()

2569

## ANALYSIS 5
#### Which state has highest number of accidents in which females are not involved? 

In [49]:
# let's first seggregate the crashes doesn't involve women

a5_01 = primary_person_df.groupBy("CRASH_ID").agg(collect_set("PRSN_GNDR_ID").alias("CRASH_GENDER_POOL"))
a5_02 = a5_01.where((size(col("CRASH_GENDER_POOL")) == 1) & (trim(col("CRASH_GENDER_POOL")[0]) == "MALE")).select("CRASH_ID").distinct()

# Let's now get the corresponding states for those crashes
a5_03 = units_df.select("CRASH_ID", "VEH_LIC_STATE_ID").distinct()

a5_04 = a5_02.join(a5_03, on = ["CRASH_ID"], how = "left").select("VEH_LIC_STATE_ID")
a5_05 = a5_04.groupBy("VEH_LIC_STATE_ID").agg(count("*").alias("STATE_COUNT")).orderBy(col("STATE_COUNT").desc())
a5_06 = a5_05.limit(1).select("VEH_LIC_STATE_ID","STATE_COUNT")
a5_06.show()

+----------------+-----------+
|VEH_LIC_STATE_ID|STATE_COUNT|
+----------------+-----------+
|              TX|      36588|
+----------------+-----------+



## ANALYSIS 6
#### Which are the Top 3rd to 5th VEH_MAKE_IDs that contribute to a largest number of injuries including death?

In [50]:
# records corresponding to all possible injuries
req_injury_sev = ["NOT INJURED", "NA", "UNKNOWN"]
a6_01 = primary_person_df.where(~trim(col("PRSN_INJRY_SEV_ID")).isin(req_injury_sev)).select("CRASH_ID", "UNIT_NBR").distinct()

# get the make_id of vehicles involved in all crashes
a6_02 = units_df.select("CRASH_ID", "UNIT_NBR", "VEH_MAKE_ID").distinct()

#bring the two together
a6_03 = a6_01.join(a6_02, on = ["CRASH_ID", "UNIT_NBR"], how = "left")
a6_04 = a6_03.groupBy("VEH_MAKE_ID").agg(count("*").alias("COUNT_BY_MAKE_ID"))

# Get those between Rank 3 & 5
w2 = Window.orderBy(col("COUNT_BY_MAKE_ID").desc())
a6_05 = a6_04.withColumn("RANK_BY_CRASH_COUNT", rank().over(w2))
a6_06 = a6_05.select("VEH_MAKE_ID","RANK_BY_CRASH_COUNT").where((col("RANK_BY_CRASH_COUNT") > 2))
a6_07 = a6_06.where((col("RANK_BY_CRASH_COUNT") < 6))
a6_07.show()

+-----------+-------------------+
|VEH_MAKE_ID|RANK_BY_CRASH_COUNT|
+-----------+-------------------+
|     TOYOTA|                  3|
|      DODGE|                  4|
|     NISSAN|                  5|
+-----------+-------------------+



## ANALYSIS 7
#### For all the body styles involved in crashes, mention the top ethnic user group of each unique body style  

In [51]:
# get body-style info for each crash unit
a7_01 = units_df.select("CRASH_ID", "UNIT_NBR", "VEH_BODY_STYL_ID").distinct()

# get ethnicity of person involved in crash
a7_02 = primary_person_df.select("CRASH_ID", "UNIT_NBR", "PRSN_ETHNICITY_ID").distinct()

# bring them together
a7_03 = a7_01.join(a7_02, on = ["CRASH_ID", "UNIT_NBR"], how = "inner").drop("CRASH_ID", "UNIT_NBR")

a7_04 = a7_03.groupBy("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID").agg(count("*").alias("count_ethnic_bdystyl_group"))#.show()

w3 = Window.partitionBy("VEH_BODY_STYL_ID").orderBy(col("count_ethnic_bdystyl_group").desc())
a7_05 = a7_04.withColumn("rank_ethnic_bdystyl_group", rank().over(w3))
a7_06 = a7_05.where(col("rank_ethnic_bdystyl_group") == 1).select("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID")
a7_06 = a7_06.withColumnRenamed("PRSN_ETHNICITY_ID", "TOP_ETHNIC_USER")
a7_06.show(truncate = False)

+---------------------------------+---------------+
|VEH_BODY_STYL_ID                 |TOP_ETHNIC_USER|
+---------------------------------+---------------+
|AMBULANCE                        |WHITE          |
|BUS                              |BLACK          |
|FARM EQUIPMENT                   |WHITE          |
|FIRE TRUCK                       |WHITE          |
|MOTORCYCLE                       |WHITE          |
|NA                               |WHITE          |
|NEV-NEIGHBORHOOD ELECTRIC VEHICLE|WHITE          |
|NOT REPORTED                     |WHITE          |
|OTHER  (EXPLAIN IN NARRATIVE)    |WHITE          |
|PASSENGER CAR, 2-DOOR            |WHITE          |
|PASSENGER CAR, 4-DOOR            |WHITE          |
|PICKUP                           |WHITE          |
|POLICE CAR/TRUCK                 |WHITE          |
|POLICE MOTORCYCLE                |WHITE          |
|SPORT UTILITY VEHICLE            |WHITE          |
|TRUCK                            |WHITE          |
|TRUCK TRACT

## ANALYSIS 8
#### Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash (Use Driver Zip Code)

In [52]:
alcohol_contrib_factors = ["UNDER INFLUENCE - ALCOHOL", "HAD BEEN DRINKING"]

# get crash-victims with alcohol as the contributing factor
a8_01 = units_df.where(trim(col("CONTRIB_FACTR_1_ID")).isin(alcohol_contrib_factors)).select("CRASH_ID", "UNIT_NBR").distinct()
# get zip info of crash-victims
a8_02 = primary_person_df.select("CRASH_ID", "UNIT_NBR", "DRVR_ZIP").distinct()

# bring both together
a8_03 = a8_01.join(a8_02, on = ["CRASH_ID", "UNIT_NBR"], how = "left")

# get the count of alcohol induced crashes by driver zip
a8_04 = a8_03.groupBy("DRVR_ZIP").agg(count('*').alias("DND_CRASH_COUNT_ZIP")).orderBy(col("DND_CRASH_COUNT_ZIP").desc()).limit(6)
a8_05 = a8_04.where(col("DRVR_ZIP").isNotNull()).distinct()
a8_05.show()

+--------+-------------------+
|DRVR_ZIP|DND_CRASH_COUNT_ZIP|
+--------+-------------------+
|   78521|                 34|
|   78741|                 33|
|   75067|                 32|
|   78130|                 30|
|   78586|                 29|
+--------+-------------------+



## ANALYSIS 9
#### Count of Distinct Crash IDs where no damaged property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance

In [53]:
req_damage_levels = ["DAMAGED 5","DAMAGED 6","DAMAGED 7 HIGHEST"]
# get crashes with req damage levels and valid insurance 
a9_01 = units_df.where((trim(col('VEH_DMAG_SCL_1_ID')).isin(req_damage_levels) | trim(col('VEH_DMAG_SCL_2_ID')).isin(req_damage_levels)))
a9_02 = a9_01.where(trim(col('FIN_RESP_TYPE_ID')).contains("INSURANCE")).select("CRASH_ID").distinct()

# get no damage crashes from above with an anti-left join with damages_df
a9_03 = damages_df.select("CRASH_ID").distinct()

a9_04 = a9_02.join(a9_03, on = ["CRASH_ID"], how = 'leftanti')
a9_04.count()

8849

## ANALYSIS 10
#### Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, used top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)

In [54]:
# get account of drivers charged with speeding related offense
a10_01 = charges_df.filter(col('CHARGE').contains('SPEED')).select('CRASH_ID','UNIT_NBR','CHARGE')

# get top 10 used vehicle colors
a10_02 = units_df.filter("VEH_COLOR_ID != 'NA'").select('VEH_COLOR_ID').groupBy('VEH_COLOR_ID').agg(count('*').alias("COUNT_BY_COLORS"))
a10_03 = a10_02.withColumn('RANK_TOP_COLORS',rank().over(Window.orderBy(col('COUNT_BY_COLORS').desc()))).filter(col("RANK_TOP_COLORS") <= 10).select('VEH_COLOR_ID')

# get top 25 states
a10_04 = units_df.filter(~col('VEH_LIC_STATE_ID').isin(['NA','Unknown','Other'])).select('VEH_LIC_STATE_ID').groupBy('VEH_LIC_STATE_ID').agg(count('*').alias("COUNT_BY_STATE"))
a10_05 = a10_04.withColumn('RANK_TOP_STATES',rank().over(Window.orderBy(col('COUNT_BY_STATE').desc()))).filter(col("RANK_TOP_STATES") <= 25).select('VEH_LIC_STATE_ID')

# get crashes corresponding to top 10 colors
a10_06 = units_df.join(a10_03,on=['VEH_COLOR_ID'],how='inner').select('CRASH_ID','UNIT_NBR','VEH_MAKE_ID','VEH_LIC_STATE_ID')

# further, get crashes corresponding to speeding related offense
a10_07 = a10_06.join(a10_01,on=['CRASH_ID','UNIT_NBR'],how='inner').select('CRASH_ID','UNIT_NBR','VEH_MAKE_ID','VEH_LIC_STATE_ID')

#furthur, get the crashes that occured only in the top 25 states
a10_08 = a10_07.join(a10_05, on=['VEH_LIC_STATE_ID'],how='inner').select('CRASH_ID','UNIT_NBR','VEH_MAKE_ID')

#crashes corresponding to licensed drivers
a10_09 = primary_person_df.filter(~trim(col("DRVR_LIC_CLS_ID")).isin(['UNLICENSED', "NA", "UNKNOWN"])).select('CRASH_ID','UNIT_NBR')

#bring them together
a10_10 = a10_09.join(a10_08,on=['CRASH_ID','UNIT_NBR'],how='inner').select('VEH_MAKE_ID').groupBy('VEH_MAKE_ID').agg(count('*').alias("MAKEWISE_COUNT")) 
a10_11 = a10_10.orderBy(col("MAKEWISE_COUNT").desc()).limit(5)

a10_11.show()

+-----------+--------------+
|VEH_MAKE_ID|MAKEWISE_COUNT|
+-----------+--------------+
|       FORD|          4270|
|  CHEVROLET|          3707|
|     TOYOTA|          2267|
|      DODGE|          1931|
|      HONDA|          1385|
+-----------+--------------+

