In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, countDistinct
from pyspark.sql.window import Window


In [3]:
spark = SparkSession.builder.appName("BCG Analysis").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/09 23:48:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/09 23:48:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
spark

# Analysis 1: Find the number of crashes (accidents) in which number of persons killed are male?

#### According to data dictionary - Primary_Person_use.csv has the Car crash info with gender details - PRSN_GNDR_ID

In [5]:
# Read the Primary_Person_use.csv
Primary_person_use_df = spark.read.csv("Data/Primary_Person_use.csv", header=True, inferSchema=True)
Primary_person_use_df.cache()
Primary_person_use_df.head(5)
Primary_person_use_df.printSchema()

[Stage 1:>                                                        (0 + 10) / 10]                                                                                

22/09/09 23:48:56 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
root
 |-- CRASH_ID: integer (nullable = true)
 |-- UNIT_NBR: integer (nullable = true)
 |-- PRSN_NBR: integer (nullable = true)
 |-- PRSN_TYPE_ID: string (nullable = true)
 |-- PRSN_OCCPNT_POS_ID: string (nullable = true)
 |-- PRSN_INJRY_SEV_ID: string (nullable = true)
 |-- PRSN_AGE: string (nullable = true)
 |-- PRSN_ETHNICITY_ID: string (nullable = true)
 |-- PRSN_GNDR_ID: string (nullable = true)
 |-- PRSN_EJCT_ID: string (nullable = true)
 |-- PRSN_REST_ID: string (nullable = true)
 |-- PRSN_AIRBAG_ID: string (nullable = true)
 |-- PRSN_HELMET_ID: string (nullable = true)
 |-- PRSN_SOL_FL: string (nullable = true)
 |-- PRSN_ALC_SPEC_TYPE_ID: string (nullable = true)
 |-- PRSN_ALC_RSLT_ID: string (nullable = true)
 |-- PRSN_BAC_TEST_RSLT: string (nullable = true)
 |-- PRSN_DRG_SPEC_TYPE_ID: string (nullab

In [6]:
# Gender wise crash count
Primary_person_use_df.groupBy(col("PRSN_GNDR_ID")).count().show()



+------------+-----+
|PRSN_GNDR_ID|count|
+------------+-----+
|          NA|   19|
|     UNKNOWN| 1212|
|        MALE|96782|
|      FEMALE|58941|
+------------+-----+



                                                                                

In [7]:
# Filtering the DF where gender is Male
male_car_crash_df = Primary_person_use_df.filter(col("PRSN_GNDR_ID") == "MALE")

# Number of crashes in which person killed is Male
print(f"Number of crashes in which person killed is Male: {male_car_crash_df.count()}")

Number of crashes in which person killed is Male: 96782


## Number of crashes in which person killed is Male: 96782

# Analysis 2: How many two wheelers are booked for crashes? 

#### According to data dictionary - Units_use.csv has the vehicle type info - VEH_BODY_STYL_ID

In [8]:
# Read the Units_use.csv
Units_use_df = spark.read.csv("Data/Units_use.csv", header=True, inferSchema=True)
Units_use_df.cache()
Units_use_df.head(5)
Units_use_df.printSchema()

root
 |-- CRASH_ID: integer (nullable = true)
 |-- UNIT_NBR: integer (nullable = true)
 |-- UNIT_DESC_ID: string (nullable = true)
 |-- VEH_PARKED_FL: string (nullable = true)
 |-- VEH_HNR_FL: string (nullable = true)
 |-- VEH_LIC_STATE_ID: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- VEH_MOD_YEAR: string (nullable = true)
 |-- VEH_COLOR_ID: string (nullable = true)
 |-- VEH_MAKE_ID: string (nullable = true)
 |-- VEH_MOD_ID: string (nullable = true)
 |-- VEH_BODY_STYL_ID: string (nullable = true)
 |-- EMER_RESPNDR_FL: string (nullable = true)
 |-- OWNR_ZIP: string (nullable = true)
 |-- FIN_RESP_PROOF_ID: string (nullable = true)
 |-- FIN_RESP_TYPE_ID: string (nullable = true)
 |-- VEH_DMAG_AREA_1_ID: string (nullable = true)
 |-- VEH_DMAG_SCL_1_ID: string (nullable = true)
 |-- FORCE_DIR_1_ID: string (nullable = true)
 |-- VEH_DMAG_AREA_2_ID: string (nullable = true)
 |-- VEH_DMAG_SCL_2_ID: string (nullable = true)
 |-- FORCE_DIR_2_ID: string (nullable = true)
 |--

In [9]:
# Vehicle type wise crash count
Units_use_df.groupBy(col("VEH_BODY_STYL_ID")).count().show(truncate=False)

+---------------------------------+-----+
|VEH_BODY_STYL_ID                 |count|
+---------------------------------+-----+
|BUS                              |463  |
|NA                               |5743 |
|VAN                              |5659 |
|PICKUP                           |36799|
|SPORT UTILITY VEHICLE            |33310|
|PASSENGER CAR, 4-DOOR            |65559|
|FIRE TRUCK                       |106  |
|TRUCK                            |4534 |
|UNKNOWN                          |1631 |
|AMBULANCE                        |87   |
|POLICE CAR/TRUCK                 |399  |
|MOTORCYCLE                       |781  |
|YELLOW SCHOOL BUS                |279  |
|PASSENGER CAR, 2-DOOR            |11033|
|TRUCK TRACTOR                    |6532 |
|FARM EQUIPMENT                   |57   |
|NEV-NEIGHBORHOOD ELECTRIC VEHICLE|6    |
|OTHER  (EXPLAIN IN NARRATIVE)    |516  |
|POLICE MOTORCYCLE                |3    |
|NOT REPORTED                     |2    |
+---------------------------------

In [10]:
distinct_unit_id = Units_use_df.groupBy(col("UNIT_DESC_ID")).count()
distinct_unit_id.show(truncate=False)

+-----------------------------+------+
|UNIT_DESC_ID                 |count |
+-----------------------------+------+
|PEDESTRIAN                   |520   |
|NON-CONTACT                  |302   |
|TRAIN                        |46    |
|MOTOR VEHICLE                |167456|
|OTHER  (EXPLAIN IN NARRATIVE)|38    |
|PEDALCYCLIST                 |164   |
|TOWED/TRAILER                |4965  |
|MOTORIZED CONVEYANCE         |8     |
+-----------------------------+------+



In [11]:
# Filtering the DF where vehicle body type is Motorcycle (2 wheeler)
two_wheeler_crash_df = Units_use_df.filter(
    (col("VEH_BODY_STYL_ID").like("%MOTORCYCLE%") | (col("UNIT_DESC_ID") == "PEDALCYCLIST")))



# Number of two wheelers booked for crashes
print(f"Number of two wheelers booked for crashes: {two_wheeler_crash_df.count()}")

Number of two wheelers booked for crashes: 948


## Number of two wheelers booked for crashes: 948

# Analysis 3: Which state has the highest number of accidents in which females are involved? 

#### According to data dictionary - Primary_Person_use.csv has the Car crash info with gender and state details - PRSN_GNDR_ID, DRVR_LIC_STATE_ID

#### We already have the DF available as Primary_Person_use_df

In [12]:
accident_info_df = Primary_person_use_df.filter(col("PRSN_GNDR_ID") == "FEMALE").groupBy(
    col("DRVR_LIC_STATE_ID")).count().orderBy(col("count").desc())



accident_info_df.limit(1).show()





+-----------------+-----+
|DRVR_LIC_STATE_ID|count|
+-----------------+-----+
|            Texas|53319|
+-----------------+-----+



In [13]:
print(f"State with highest number of accidents in which females are involved: {accident_info_df.first()['DRVR_LIC_STATE_ID']}")



State with highest number of accidents in which females are involved: Texas


## State with highest no. of accidents in which females are involved: - 
### Texas, 53319 accidents

# Analysis 4: Which are the Top 5th to 15th VEH_MAKE_IDs that contribute to a largest number of injuries including death

#### According to data dictionary - Units_use.csv has the Car crash info with injury count, death and VEH_MAKE_ID info


#### We already have the DF available as Units_use_df

In [14]:
injury_and_vehicle_info_df = Units_use_df.select(col("CRASH_ID"), 
                                                 col("VEH_MAKE_ID"),
                                                col("TOT_INJRY_CNT"),
                                                col("DEATH_CNT")).\
                                                withColumn("ALL_INJURIES", Units_use_df["TOT_INJRY_CNT"] + Units_use_df["DEATH_CNT"]).\
                                                orderBy(col("ALL_INJURIES").desc())

injury_and_vehicle_info_df.show(10)
                                                                                         
                                                                                         
                                                                                        

+--------+------------------+-------------+---------+------------+
|CRASH_ID|       VEH_MAKE_ID|TOT_INJRY_CNT|DEATH_CNT|ALL_INJURIES|
+--------+------------------+-------------+---------+------------+
|15156583|NORTH AMERICAN BUS|           31|        0|          31|
|15156583|NORTH AMERICAN BUS|           31|        0|          31|
|14985400|NORTH AMERICAN BUS|           15|        0|          15|
|14985400|NORTH AMERICAN BUS|           15|        0|          15|
|14957289|         BLUE BIRD|           11|        0|          11|
|14957289|         BLUE BIRD|           11|        0|          11|
|15512740|NORTH AMERICAN BUS|           11|        0|          11|
|15512740|NORTH AMERICAN BUS|           11|        0|          11|
|15139734|           LINCOLN|           10|        0|          10|
|15375528|          VAN HOOL|           10|        0|          10|
+--------+------------------+-------------+---------+------------+
only showing top 10 rows



In [15]:
vehicle_wise_injuries_df = injury_and_vehicle_info_df.filter(injury_and_vehicle_info_df["VEH_MAKE_ID"] != "NA").\
                                     groupBy("VEH_MAKE_ID").\
                                     sum("ALL_INJURIES").\
                                     withColumnRenamed("sum(ALL_INJURIES)", "total_injuries").\
                                     orderBy(col("total_injuries").desc()).limit(15)
vehicle_wise_injuries_df.show()




+-----------+--------------+
|VEH_MAKE_ID|total_injuries|
+-----------+--------------+
|  CHEVROLET|          7024|
|       FORD|          6992|
|     TOYOTA|          4228|
|      DODGE|          3146|
|     NISSAN|          3118|
|      HONDA|          2892|
|        GMC|          1256|
|    HYUNDAI|          1103|
|        KIA|          1049|
|       JEEP|           989|
|   CHRYSLER|           956|
|      MAZDA|           711|
| VOLKSWAGEN|           582|
|    PONTIAC|           564|
|      LEXUS|           523|
+-----------+--------------+



In [16]:
# Filtering from 5 to 15 rows
vehicle_wise_injuries_df_5_to_15 = vehicle_wise_injuries_df.tail(11)
vehicle_wise_injuries_df_5_to_15


[Row(VEH_MAKE_ID='NISSAN', total_injuries=3118),
 Row(VEH_MAKE_ID='HONDA', total_injuries=2892),
 Row(VEH_MAKE_ID='GMC', total_injuries=1256),
 Row(VEH_MAKE_ID='HYUNDAI', total_injuries=1103),
 Row(VEH_MAKE_ID='KIA', total_injuries=1049),
 Row(VEH_MAKE_ID='JEEP', total_injuries=989),
 Row(VEH_MAKE_ID='CHRYSLER', total_injuries=956),
 Row(VEH_MAKE_ID='MAZDA', total_injuries=711),
 Row(VEH_MAKE_ID='VOLKSWAGEN', total_injuries=582),
 Row(VEH_MAKE_ID='PONTIAC', total_injuries=564),
 Row(VEH_MAKE_ID='LEXUS', total_injuries=523)]

In [17]:
count = 5
for vehicle_info in vehicle_wise_injuries_df_5_to_15:
    print(f"Vehicle number {count} -- {vehicle_info[0]}: {vehicle_info[1]}")
    count += 1

Vehicle number 5 -- NISSAN: 3118
Vehicle number 6 -- HONDA: 2892
Vehicle number 7 -- GMC: 1256
Vehicle number 8 -- HYUNDAI: 1103
Vehicle number 9 -- KIA: 1049
Vehicle number 10 -- JEEP: 989
Vehicle number 11 -- CHRYSLER: 956
Vehicle number 12 -- MAZDA: 711
Vehicle number 13 -- VOLKSWAGEN: 582
Vehicle number 14 -- PONTIAC: 564
Vehicle number 15 -- LEXUS: 523


# Analysis 5: For all the body styles involved in crashes, mention the top ethnic user group of each unique body style

#### We have the vehicle body style info in Units_use.csv and ethnicity info in Primary_person_use.csv, 
#### we already have this info in Units_use_df and Primary_person_use_df

In [18]:
w = Window.partitionBy("VEH_BODY_STYL_ID").orderBy(col("count").desc())

df = Units_use_df.join(Primary_person_use_df, Primary_person_use_df["CRASH_ID"] == Units_use_df["CRASH_ID"], how='inner'). \
    filter(~Units_use_df.VEH_BODY_STYL_ID.isin(["NA", "UNKNOWN", "NOT REPORTED",
                                                 "OTHER  (EXPLAIN IN NARRATIVE)"])). \
    filter(~Primary_person_use_df.PRSN_ETHNICITY_ID.isin(["NA", "UNKNOWN"])). \
    groupby("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID").count(). \
    withColumn("row", row_number().over(w)).filter(col("row") == 1).drop("row", "count")

In [19]:
# We need to join units_use_df and Primary_person_use_df
vehicle_and_ethnicity_info = Primary_person_use_df.join(Units_use_df, 
                                                        Primary_person_use_df["CRASH_ID"] == Units_use_df["CRASH_ID"],
                                                        how="inner").\
                             select(Units_use_df["VEH_BODY_STYL_ID"], Primary_person_use_df["PRSN_ETHNICITY_ID"])
vehicle_and_ethnicity_info.show(10, truncate=False)

+---------------------+-----------------+
|VEH_BODY_STYL_ID     |PRSN_ETHNICITY_ID|
+---------------------+-----------------+
|PASSENGER CAR, 4-DOOR|HISPANIC         |
|PASSENGER CAR, 4-DOOR|BLACK            |
|PASSENGER CAR, 4-DOOR|WHITE            |
|TRUCK                |BLACK            |
|TRUCK                |WHITE            |
|TRUCK                |BLACK            |
|TRUCK                |WHITE            |
|NA                   |BLACK            |
|NA                   |WHITE            |
|PASSENGER CAR, 4-DOOR|BLACK            |
+---------------------+-----------------+
only showing top 10 rows



In [20]:
vehicle_and_ethnicity_info.count()

367420

In [21]:
# Top ethnic user group of each unique body style
windowSpec  = Window.partitionBy("VEH_BODY_STYL_ID").orderBy(col("count").desc())

vehicle_and_ethnicity_df = vehicle_and_ethnicity_info.filter(
    vehicle_and_ethnicity_info["VEH_BODY_STYL_ID"].isin(["NA", "UNKNOWN", "NOT REPORTED",
                                                         "OTHER  (EXPLAIN IN NARRATIVE)"]) == False). \
            filter(vehicle_and_ethnicity_info["PRSN_ETHNICITY_ID"].isin(["NA", "UNKNOWN"]) == False)
vehicle_and_ethnicity_df.show(10, truncate=False)

+---------------------+-----------------+
|VEH_BODY_STYL_ID     |PRSN_ETHNICITY_ID|
+---------------------+-----------------+
|PASSENGER CAR, 4-DOOR|HISPANIC         |
|PASSENGER CAR, 4-DOOR|BLACK            |
|PASSENGER CAR, 4-DOOR|WHITE            |
|TRUCK                |BLACK            |
|TRUCK                |WHITE            |
|TRUCK                |BLACK            |
|TRUCK                |WHITE            |
|PASSENGER CAR, 4-DOOR|BLACK            |
|PASSENGER CAR, 4-DOOR|WHITE            |
|SPORT UTILITY VEHICLE|BLACK            |
+---------------------+-----------------+
only showing top 10 rows



In [22]:
vehicle_and_ethnicity_df = vehicle_and_ethnicity_df.\
            groupby("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID").count().\
            withColumn("row_num", row_number().over(windowSpec)).filter(col("row_num") == 1)

vehicle_and_ethnicity_df.select(col("VEH_BODY_STYL_ID"), col("PRSN_ETHNICITY_ID")).show(10)

+--------------------+-----------------+
|    VEH_BODY_STYL_ID|PRSN_ETHNICITY_ID|
+--------------------+-----------------+
|           AMBULANCE|            WHITE|
|                 BUS|         HISPANIC|
|      FARM EQUIPMENT|            WHITE|
|          FIRE TRUCK|            WHITE|
|          MOTORCYCLE|            WHITE|
|NEV-NEIGHBORHOOD ...|            WHITE|
|PASSENGER CAR, 2-...|            WHITE|
|PASSENGER CAR, 4-...|            WHITE|
|              PICKUP|            WHITE|
|    POLICE CAR/TRUCK|            WHITE|
+--------------------+-----------------+
only showing top 10 rows



In [23]:
w = Window.partitionBy("VEH_BODY_STYL_ID").orderBy(col("count").desc())

df = Units_use_df.join(Primary_person_use_df, Primary_person_use_df["CRASH_ID"] == Units_use_df["CRASH_ID"], how='inner'). \
    filter(~Units_use_df.VEH_BODY_STYL_ID.isin(["NA", "UNKNOWN", "NOT REPORTED",
                                                 "OTHER  (EXPLAIN IN NARRATIVE)"])). \
    filter(~Primary_person_use_df.PRSN_ETHNICITY_ID.isin(["NA", "UNKNOWN"])). \
    groupby("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID").count(). \
    withColumn("row", row_number().over(w)).filter(col("row") == 1).drop("row", "count")

In [24]:
df.show(truncate=False)

+---------------------------------+-----------------+
|VEH_BODY_STYL_ID                 |PRSN_ETHNICITY_ID|
+---------------------------------+-----------------+
|AMBULANCE                        |WHITE            |
|BUS                              |HISPANIC         |
|FARM EQUIPMENT                   |WHITE            |
|FIRE TRUCK                       |WHITE            |
|MOTORCYCLE                       |WHITE            |
|NEV-NEIGHBORHOOD ELECTRIC VEHICLE|WHITE            |
|PASSENGER CAR, 2-DOOR            |WHITE            |
|PASSENGER CAR, 4-DOOR            |WHITE            |
|PICKUP                           |WHITE            |
|POLICE CAR/TRUCK                 |WHITE            |
|POLICE MOTORCYCLE                |HISPANIC         |
|SPORT UTILITY VEHICLE            |WHITE            |
|TRUCK                            |WHITE            |
|TRUCK TRACTOR                    |WHITE            |
|VAN                              |WHITE            |
|YELLOW SCHOOL BUS          

# Analysis 6: Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash (Use Driver Zip Code)

#### According to data dictionary - Primary_person_use.csv has the Car crash info with driver zip code and alcohol result - DRVR_ZIP and PRSN_ALC_RSLT_ID, contributing factors are available in Units_use.csv
#### We already have the DFs available as Primary_person_use_df and Units_use_df

In [25]:
crashes_due_to_alcohol_df = Units_use_df.join(Primary_person_use_df, Units_use_df["CRASH_ID"] == Primary_person_use_df["CRASH_ID"], how='inner') \
            .dropna(subset=["DRVR_ZIP"])\
            .filter(col("CONTRIB_FACTR_1_ID").like("%ALCOHOL%") | col("CONTRIB_FACTR_2_ID").like("%ALCOHOL%")
                  | (col("PRSN_ALC_RSLT_ID") == "Positive")) \
            .groupby("DRVR_ZIP").count().orderBy(col("count").desc()).limit(5)
crashes_due_to_alcohol_df.show()

+--------+-----+
|DRVR_ZIP|count|
+--------+-----+
|   78521|  144|
|   76010|  131|
|   79936|   91|
|   79938|   82|
|   78753|   81|
+--------+-----+



# Analysis 7: Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance

#### According to data dictionary - Damages_use.csv has the Car crash info with damages, Units_use.csv has info regarding Damage level - VEH_DMAG_SCL, Insurance
#### We already have the DFs available as Primary_person_use_df and Units_use_df

In [26]:
# Read the Primary_Person_use.csv
Damages_use_df = spark.read.csv("Data/Damages_use.csv", header=True, inferSchema=True)
Damages_use_df.cache()
Damages_use_df.head(5)
Damages_use_df.printSchema()

root
 |-- CRASH_ID: integer (nullable = true)
 |-- DAMAGED_PROPERTY: string (nullable = true)



In [27]:
damage_insurance_info = Damages_use_df.join(Units_use_df, on=["CRASH_ID"], how='inner')
damage_insurance_info.select(col("CRASH_ID"), col("DAMAGED_PROPERTY")).show(10, truncate=False)


+--------+---------------------------------+
|CRASH_ID|DAMAGED_PROPERTY                 |
+--------+---------------------------------+
|14768622|YARD, GRASS                      |
|14768622|MAILBOX                          |
|14838668|GUARDRAIL                        |
|14838685|ROAD SIGN                        |
|14838693|2009 MAZDA 3                     |
|14838834|CHAIN LINK FENCE                 |
|14838834|CHAIN LINK FENCE                 |
|14838834|CHAIN LINK FENCE                 |
|14838834|CHAIN LINK FENCE                 |
|14838841|WOODED POLE ON SOUTH SIDE OF LOOP|
+--------+---------------------------------+
only showing top 10 rows



In [28]:
# Unique damage types
damage_insurance_info.groupBy("VEH_DMAG_SCL_1_ID").count().show()

+-----------------+-----+
|VEH_DMAG_SCL_1_ID|count|
+-----------------+-----+
|        DAMAGED 4| 6142|
|               NA| 2420|
|        DAMAGED 5| 3590|
|DAMAGED 1 MINIMUM| 4731|
|        DAMAGED 3| 9375|
|        NO DAMAGE|  560|
|DAMAGED 7 HIGHEST| 1410|
|        DAMAGED 2| 7539|
|        DAMAGED 6| 1577|
+-----------------+-----+



In [29]:
# Unique damage types
damage_insurance_info.groupBy("VEH_DMAG_SCL_2_ID").count().show()

+-----------------+-----+
|VEH_DMAG_SCL_2_ID|count|
+-----------------+-----+
|        DAMAGED 4| 1642|
|               NA|23922|
|        DAMAGED 5|  901|
|DAMAGED 1 MINIMUM| 2444|
|        DAMAGED 3| 3132|
|        NO DAMAGE|  679|
|DAMAGED 7 HIGHEST|  498|
|        DAMAGED 2| 3679|
|        DAMAGED 6|  447|
+-----------------+-----+



In [30]:
# Unique damage types
damage_insurance_info.groupBy("FIN_RESP_TYPE_ID").count().show(truncate=False)

+----------------------------------------+-----+
|FIN_RESP_TYPE_ID                        |count|
+----------------------------------------+-----+
|INSURANCE BINDER                        |58   |
|LIABILITY INSURANCE POLICY              |6098 |
|NA                                      |7128 |
|CERTIFICATE OF SELF-INSURANCE           |224  |
|CERTIFICATE OF DEPOSIT WITH COMPTROLLER |6    |
|PROOF OF LIABILITY INSURANCE            |23822|
|SURETY BOND                             |7    |
|CERTIFICATE OF DEPOSIT WITH COUNTY JUDGE|1    |
+----------------------------------------+-----+



In [31]:
# Unique damage types
damage_insurance_info.filter(
    (col("DAMAGED_PROPERTY").like("NO DAMAGE%")) | (col("DAMAGED_PROPERTY").like("NONE%"))
) \
    .groupBy("DAMAGED_PROPERTY").count().show(1000, truncate=False)




+----------------------------------+-----+
|DAMAGED_PROPERTY                  |count|
+----------------------------------+-----+
|NO DAMAGES TO THE CITY POLE 214385|1    |
|NONE                              |209  |
|NONE1                             |2    |
|NO DAMAGE, SIGNAL LIGHT POLE      |2    |
|NO DAMAGE TO BARRICADE            |1    |
|NO DAMAGE TO FENCE                |1    |
+----------------------------------+-----+



In [32]:
damage_insurance_info.count()

37344

In [33]:
# FILTERING:  No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance
no_damage_insurace_availed_df = damage_insurance_info. \
    filter(
            ((col("VEH_DMAG_SCL_1_ID") > "DAMAGED 4") & (col("VEH_DMAG_SCL_1_ID").isin(["NA", "NO DAMAGE"]) == False)) 
                                           | 
            ((col("VEH_DMAG_SCL_2_ID") > "DAMAGED 4") & (col("VEH_DMAG_SCL_2_ID").isin(["NA", "NO DAMAGE"]) == False))
          ) \
    .filter((col("DAMAGED_PROPERTY") == "NONE") | (col("DAMAGED_PROPERTY").like("NO DAMAGE%"))) \
    .filter(col("FIN_RESP_TYPE_ID").like("%LIABILITY INSURANCE POLICY%"))

In [34]:
no_damage_insurace_availed_df.select(col("CRASH_ID"), col("VEH_DMAG_SCL_1_ID"), col("VEH_DMAG_SCL_2_ID"), 
                                    col("FIN_RESP_TYPE_ID"), col("DAMAGED_PROPERTY")).show(truncate=False)

+--------+-----------------+-----------------+--------------------------+----------------+
|CRASH_ID|VEH_DMAG_SCL_1_ID|VEH_DMAG_SCL_2_ID|FIN_RESP_TYPE_ID          |DAMAGED_PROPERTY|
+--------+-----------------+-----------------+--------------------------+----------------+
|14870169|DAMAGED 7 HIGHEST|NA               |LIABILITY INSURANCE POLICY|NONE            |
|14885395|DAMAGED 6        |NA               |LIABILITY INSURANCE POLICY|NONE            |
|14885395|DAMAGED 6        |NA               |LIABILITY INSURANCE POLICY|NONE            |
|15255328|DAMAGED 5        |NA               |LIABILITY INSURANCE POLICY|NONE            |
+--------+-----------------+-----------------+--------------------------+----------------+



In [35]:
print(f"Total number of Distinct Crash IDs where No Damaged Property was observed and Damage Level is above 4 and car avails Insurance: {no_damage_insurace_availed_df.count()}")

Total number of Distinct Crash IDs where No Damaged Property was observed and Damage Level is above 4 and car avails Insurance: 4


# Analysis 8: Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, uses top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)

#### According to data dictionary, we have the Vehicle info in Units_use.csv, Driver details in primary_peron_use.cs
#### We already have the DFs available as Primary_person_use_df and Units_use_df

In [36]:
# Top 10 used vehicle colors
vehicle_colors_df = Units_use_df.groupby(col("VEH_COLOR_ID")).count().orderBy(col("count").desc())

# top 10
top_ten_vehicle_colors = vehicle_colors_df.limit(10)
top_ten_vehicle_colors.show()

+------------+-----+
|VEH_COLOR_ID|count|
+------------+-----+
|         WHI|38354|
|         BLK|27749|
|         SIL|20777|
|         GRY|18174|
|         BLU|15471|
|         RED|14095|
|         GRN| 6767|
|          NA| 6193|
|         MAR| 6010|
|         TAN| 4846|
+------------+-----+



In [37]:
# Filtering null values from top_ten_vehicle_colors
vehicle_colors_df = vehicle_colors_df.filter((col("VEH_COLOR_ID") == "NA") == False)
top_ten_vehicle_colors = vehicle_colors_df.limit(10)
top_ten_vehicle_colors.show()

+------------+-----+
|VEH_COLOR_ID|count|
+------------+-----+
|         WHI|38354|
|         BLK|27749|
|         SIL|20777|
|         GRY|18174|
|         BLU|15471|
|         RED|14095|
|         GRN| 6767|
|         MAR| 6010|
|         TAN| 4846|
|         GLD| 4062|
+------------+-----+



In [38]:
# Top 25 states 
vehicle_state_df = Units_use_df.groupBy(col("VEH_LIC_STATE_ID")).count().orderBy(col("count").desc())
vehicle_state_df.show(25)


+----------------+------+
|VEH_LIC_STATE_ID| count|
+----------------+------+
|              TX|156997|
|              NA|  6065|
|              UN|  1791|
|              OK|   970|
|              LA|   782|
|              NM|   731|
|              IN|   544|
|              MX|   493|
|              CA|   482|
|              FL|   416|
|              IL|   370|
|              AR|   350|
|              TN|   304|
|              MS|   244|
|              AZ|   236|
|              KS|   186|
|              GA|   177|
|              MO|   177|
|              CO|   175|
|              NC|   159|
|              AL|   145|
|              OH|   127|
|              MI|   122|
|              MN|   113|
|              98|   109|
+----------------+------+
only showing top 25 rows



In [39]:

vehicle_state_df.dtypes

[('VEH_LIC_STATE_ID', 'string'), ('count', 'bigint')]

In [40]:
# Filter 98 and other such bad records in vehicle_state_df
vehicle_state_df.filter(col("VEH_LIC_STATE_ID").cast("int").isNotNull()).show(25)

+----------------+-----+
|VEH_LIC_STATE_ID|count|
+----------------+-----+
|              98|  109|
+----------------+-----+



In [41]:
# We only need Vehicle state ID 
vehicle_state_df.filter(col("VEH_LIC_STATE_ID").cast("int").isNull()).show(25)

+----------------+------+
|VEH_LIC_STATE_ID| count|
+----------------+------+
|              TX|156997|
|              NA|  6065|
|              UN|  1791|
|              OK|   970|
|              LA|   782|
|              NM|   731|
|              IN|   544|
|              MX|   493|
|              CA|   482|
|              FL|   416|
|              IL|   370|
|              AR|   350|
|              TN|   304|
|              MS|   244|
|              AZ|   236|
|              KS|   186|
|              MO|   177|
|              GA|   177|
|              CO|   175|
|              NC|   159|
|              AL|   145|
|              OH|   127|
|              MI|   122|
|              MN|   113|
|              WI|   101|
+----------------+------+
only showing top 25 rows



In [42]:
# Speeding related offenses are present in Charges_use.csv

# Read the Charges_use.csv
Charges_use_df = spark.read.csv("Data/Charges_use.csv", header=True, inferSchema=True)
Charges_use_df.cache()
Charges_use_df.head(5)
Charges_use_df.printSchema()

root
 |-- CRASH_ID: integer (nullable = true)
 |-- UNIT_NBR: integer (nullable = true)
 |-- PRSN_NBR: integer (nullable = true)
 |-- CHARGE: string (nullable = true)
 |-- CITATION_NBR: string (nullable = true)



In [43]:
speeding_related_offenses_df = Charges_use_df.filter(col("CHARGE").like("%SPEED%"))
speeding_related_offenses_df.show(10, truncate=False)

+--------+--------+--------+--------------------------------+------------+
|CRASH_ID|UNIT_NBR|PRSN_NBR|CHARGE                          |CITATION_NBR|
+--------+--------+--------+--------------------------------+------------+
|14838834|1       |1       |FAIL TO CONTROL SPEED           |10019200    |
|14838972|2       |1       |FAIL TO CONTROL SPEED           |E010515     |
|14838977|1       |1       |UNSAFE SPEED                    |TS00132638  |
|14839233|1       |1       |UNSAFE SPEED                    |TX4IAC0VAP35|
|14839240|1       |1       |FAILED TO CONTROL SPEED ACCIDENT|138958691   |
|14839272|1       |1       |FAIL TO CONTROL SPEED           |AT10352542  |
|14839342|1       |1       |FAILED TO CONTROL SPEED         |E0021752    |
|14839404|1       |1       |FAILED TO CONTROL SPEED         |138474077   |
|14839510|1       |1       |FAIL TO CONTROL SPEED           |10330959    |
|14839519|1       |1       |UNSAFE SPEED                    |TX4IC50PTGSW|
+--------+--------+------

In [44]:
# Drivers info
drivers_ID_info_df = Primary_person_use_df.groupBy(col("DRVR_LIC_TYPE_ID")).count()
drivers_ID_info_df.show(truncate=False)

+----------------------+------+
|DRVR_LIC_TYPE_ID      |count |
+----------------------+------+
|NA                    |866   |
|COMMERCIAL DRIVER LIC.|7204  |
|ID CARD               |11924 |
|UNKNOWN               |2268  |
|OCCUPATIONAL          |124   |
|UNLICENSED            |8988  |
|OTHER                 |354   |
|DRIVER LICENSE        |125226|
+----------------------+------+



In [45]:
# Drivers with license
drivers_with_license_df = Primary_person_use_df.filter(col("DRVR_LIC_TYPE_ID").isin(["DRIVER LICENSE", "COMMERCIAL DRIVER LIC."]))
# drivers_with_license_df.show(10)

In [46]:
speeding_related_offenses_df.columns

['CRASH_ID', 'UNIT_NBR', 'PRSN_NBR', 'CHARGE', 'CITATION_NBR']

In [47]:
drivers_with_license_df.columns

['CRASH_ID',
 'UNIT_NBR',
 'PRSN_NBR',
 'PRSN_TYPE_ID',
 'PRSN_OCCPNT_POS_ID',
 'PRSN_INJRY_SEV_ID',
 'PRSN_AGE',
 'PRSN_ETHNICITY_ID',
 'PRSN_GNDR_ID',
 'PRSN_EJCT_ID',
 'PRSN_REST_ID',
 'PRSN_AIRBAG_ID',
 'PRSN_HELMET_ID',
 'PRSN_SOL_FL',
 'PRSN_ALC_SPEC_TYPE_ID',
 'PRSN_ALC_RSLT_ID',
 'PRSN_BAC_TEST_RSLT',
 'PRSN_DRG_SPEC_TYPE_ID',
 'PRSN_DRG_RSLT_ID',
 'DRVR_DRG_CAT_1_ID',
 'PRSN_DEATH_TIME',
 'INCAP_INJRY_CNT',
 'NONINCAP_INJRY_CNT',
 'POSS_INJRY_CNT',
 'NON_INJRY_CNT',
 'UNKN_INJRY_CNT',
 'TOT_INJRY_CNT',
 'DEATH_CNT',
 'DRVR_LIC_TYPE_ID',
 'DRVR_LIC_STATE_ID',
 'DRVR_LIC_CLS_ID',
 'DRVR_ZIP']

In [48]:
# Top 5 Vehicle Makes where drivers are charged with speeding related offences, 
# has licensed Drivers

final_df_with_vehicle_make_info_df = drivers_with_license_df.join(speeding_related_offenses_df, on=["CRASH_ID"]). \
                                    join(Units_use_df, on=["CRASH_ID"]).groupby("VEH_MAKE_ID").count(). \
                                    orderBy(col("count").desc()).limit(5) 
                                    


final_df_with_vehicle_make_info_df.show()

+-----------+-----+
|VEH_MAKE_ID|count|
+-----------+-----+
|       FORD|20396|
|  CHEVROLET|18228|
|     TOYOTA|12524|
|      DODGE| 8474|
|     NISSAN| 7720|
+-----------+-----+



In [49]:
# uses top 10 used vehicle colours and has car licensed 
# with the Top 25 states with highest number of offences

# Top 25 states
top_25_states = vehicle_state_df.collect()
top_25_states_list = [state_info[0] for state_info in top_25_states]
print(top_25_states_list)


# Top 10 colors
top_10_colors = top_ten_vehicle_colors.collect()
top_10_colors_list = [color_info[0] for color_info in top_10_colors]
print(top_10_colors_list)

['TX', 'NA', 'UN', 'OK', 'LA', 'NM', 'IN', 'MX', 'CA', 'FL', 'IL', 'AR', 'TN', 'MS', 'AZ', 'KS', 'MO', 'GA', 'CO', 'NC', 'AL', 'OH', 'MI', 'MN', '98', 'WI', 'VA', 'NE', 'WA', 'IA', 'UT', 'OR', 'PA', 'SC', 'CD', 'NY', 'KY', 'NV', 'MD', 'NJ', 'MT', 'ID', 'ND', 'CT', 'SD', 'WY', 'AK', 'MA', 'WV', 'AS', 'NH', 'ME', 'US', 'RI', 'VT', 'HI', 'DE', 'DS', 'DC']
['WHI', 'BLK', 'SIL', 'GRY', 'BLU', 'RED', 'GRN', 'MAR', 'TAN', 'GLD']


In [51]:
# Top 5 Vehicle Makes where drivers are charged with speeding related offences, 
# has licensed Drivers, uses top 10 used vehicle colours and 
# has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)

final_df_with_vehicle_make_info_df = drivers_with_license_df.join(speeding_related_offenses_df, on=["CRASH_ID"]). \
                                    join(Units_use_df, on=["CRASH_ID"]). \
                                    filter(Units_use_df["VEH_COLOR_ID"].isin(top_10_colors_list)). \
                                    filter(Units_use_df["VEH_LIC_STATE_ID"].isin(top_25_states_list)). \
                                    groupby("VEH_MAKE_ID").count(). \
                                    orderBy(col("count").desc())



                                    


final_df_with_vehicle_make_info_df = final_df_with_vehicle_make_info_df.limit(5)
final_df_with_vehicle_make_info_df.show()

+-----------+-----+
|VEH_MAKE_ID|count|
+-----------+-----+
|       FORD|19321|
|  CHEVROLET|16959|
|     TOYOTA|11889|
|      DODGE| 7980|
|     NISSAN| 7358|
+-----------+-----+

