In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=525b3e39f227612acbf6a1530f2d7ce04b9d67c2359900628dd82f64573023d7
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.appName("case_study_solution").master("local[*]").getOrCreate()

In [35]:
data_source_path = 'data/Data/'
df_charge = spark.read.csv(data_source_path+"Charges_use.csv", header=True, inferSchema=True)
df_damage = spark.read.csv(data_source_path+"Damages_use.csv", header=True, inferSchema=True)
df_endorse = spark.read.csv(data_source_path+"Endorse_use.csv", header=True, inferSchema=True)
df_person = spark.read.csv(data_source_path+"Primary_Person_use.csv", header=True, inferSchema=True)
df_restrict = spark.read.csv(data_source_path+"Restrict_use.csv", header=True, inferSchema=True)
df_units = spark.read.csv(data_source_path+"Units_use.csv", header=True, inferSchema=True)

In [46]:
# Assuming these set of columns can uniquely identify a vehicle
veh_attribute_cols = ['veh_lic_state_id','vin','veh_mod_year','veh_make_id']

In [115]:
# Analysis Q1 ------------------------

df_person_male_killed_more_than_2 = df_person\
  .dropDuplicates(['crash_id', 'unit_nbr'])\
  .where((upper(col('prsn_gndr_id'))=='MALE')
  & (upper(col('prsn_injry_sev_id'))=='KILLED'))\
  .groupBy('crash_id').agg(count('*').alias('count_of_males_killed'))\
  .filter(col('count_of_males_killed')>2)

df_person_male_killed_more_than_2.count()

0

In [116]:
# Analysis Q2 -----------------------

df_units_2_wheeler = df_units\
  .dropDuplicates(['crash_id', 'unit_nbr'])\
  .where(upper(col('veh_body_styl_id')).contains('MOTORCYCLE'))\
  .dropDuplicates(veh_attribute_cols)

df_units_2_wheeler.count()

770

In [117]:
# Analysis Q3 -----------

df_units_filtered = df_units\
  .dropDuplicates(['crash_id', 'unit_nbr'])\
  .where(upper(col('veh_body_styl_id')).contains('CAR'))

df_person_filtered = df_person\
  .dropDuplicates(['crash_id', 'unit_nbr'])\
  .where((upper(col('prsn_type_id'))=='DRIVER')
  & (upper(col('prsn_airbag_id'))=='NOT DEPLOYED')
  & (upper(col('prsn_injry_sev_id'))=='KILLED'))

df_person_unit_merged = df_units_filtered\
  .join(df_person_filtered, ['crash_id', 'unit_nbr'], 'inner')\
  .dropDuplicates(veh_attribute_cols)\
  .groupBy('veh_make_id').count().orderBy(col('count').desc()).limit(5)

df_person_unit_merged.show()

+-----------+-----+
|veh_make_id|count|
+-----------+-----+
|     NISSAN|    4|
|  CHEVROLET|    3|
|       FORD|    2|
|      HONDA|    2|
|    PONTIAC|    1|
+-----------+-----+



In [118]:
# Analysis Q4 ------------

df_units_filtered = df_units\
  .dropDuplicates(['crash_id', 'unit_nbr'])\
  .where(upper(col('veh_hnr_fl'))=='Y')\
  .drop('death_cnt','incap_injry_cnt',
        'non_injry_cnt','nonincap_injry_cnt',
        'poss_injry_cnt','tot_injry_cnt','unkn_injry_cnt')
        # dropping the common columns in df_person & df_units


df_person_filtered = df_person\
  .dropDuplicates(['crash_id', 'unit_nbr'])\
  .where(upper(col('prsn_type_id')).contains('DRIVER')
  & (upper(col('drvr_lic_cls_id')).contains('CLASS')
  | upper(col('drvr_lic_cls_id')).contains('OTHER')))

df_person_unit_merged = df_units_filtered\
  .join(df_person_filtered, ['crash_id', 'unit_nbr'], 'inner')\
  .dropDuplicates(veh_attribute_cols)

df_person_unit_merged.count()

2502

In [119]:
# Analysis Q5 ------------------

df_person_filtered = df_person\
  .dropDuplicates(['crash_id', 'unit_nbr'])

df_crash_with_no_female = df_person_filtered\
  .withColumn('count_of_females',when(upper(col('PRSN_GNDR_ID'))=='FEMALE',1).otherwise(0))\
  .groupBy(col('crash_id'))\
  .agg(sum(col('count_of_females')).alias('count_of_females_per_crash_id'))\
  .filter(col('count_of_females_per_crash_id')==0)\

df_state_most_accidents = df_crash_with_no_female\
  .join(df_person_filtered,'crash_id', 'inner')\
  .groupBy('drvr_lic_state_id')\
  .agg(countDistinct('crash_id').alias('count_of_accidents'))\
  .orderBy(col("count_of_accidents").desc())

df_state_most_accidents.limit(1).show()

+-----------------+------------------+
|drvr_lic_state_id|count_of_accidents|
+-----------------+------------------+
|            Texas|             34842|
+-----------------+------------------+



In [67]:
# Analysis Q6 ------------

df_death_plus_injury_counts = df_units\
  .dropDuplicates(['crash_id','unit_nbr'])\
  .withColumn("death_plus_injury_cnt", col("tot_injry_cnt") + col("death_cnt"))\
  .groupBy("veh_make_id")\
  .agg(sum(col('death_plus_injury_cnt')).alias('death_plus_injry_cnt_per_veh_make_id'))

window_spec = Window.orderBy(col('death_plus_injry_cnt_per_veh_make_id').desc())

df_top_3_to_5_make_id = df_death_plus_injury_counts\
  .withColumn('rn', row_number().over(window_spec))\
  .orderBy(col('death_plus_injry_cnt_per_veh_make_id').desc())\
  .filter(col("rn").isin(3,4,5))\
  .drop('rn')

df_top_3_to_5_make_id.show()

+-----------+------------------------------------+
|veh_make_id|death_plus_injry_cnt_per_veh_make_id|
+-----------+------------------------------------+
|     TOYOTA|                                4227|
|      DODGE|                                3136|
|     NISSAN|                                3113|
+-----------+------------------------------------+



In [120]:
# Analysis Q7 ----------

df_units_filtered = df_units\
  .dropDuplicates(['crash_id','unit_nbr'])\
  .where(~upper(col('veh_body_styl_id')).isin('NA', 'UNKNOWN', 'NOT REPORTED'))

df_person_filtered = df_person\
  .dropDuplicates(['crash_id','unit_nbr'])\
  .where(~upper(col('prsn_ethnicity_id')).isin('NA', 'UNKNOWN'))

df_person_unit_merged = df_units_filtered\
  .join(df_person_filtered, ['crash_id', 'unit_nbr'], 'inner')\
  .groupBy(['veh_body_styl_id', 'prsn_ethnicity_id'])\
  .agg(count('*').alias('count'))

window_spec = Window.partitionBy('veh_body_styl_id').orderBy(col('count').desc())

df_top_ethnic_user_group = df_person_unit_merged\
  .withColumn('rn', row_number().over(window_spec))\
  .filter(col('rn')==1).drop('rn')\
  .orderBy(col('count').desc())

df_top_ethnic_user_group.show(truncate=False)

+---------------------------------+-----------------+-----+
|veh_body_styl_id                 |prsn_ethnicity_id|count|
+---------------------------------+-----------------+-----+
|PASSENGER CAR, 4-DOOR            |WHITE            |25341|
|PICKUP                           |WHITE            |18959|
|SPORT UTILITY VEHICLE            |WHITE            |15484|
|PASSENGER CAR, 2-DOOR            |WHITE            |4711 |
|VAN                              |WHITE            |2179 |
|TRUCK                            |WHITE            |1482 |
|TRUCK TRACTOR                    |WHITE            |1278 |
|MOTORCYCLE                       |WHITE            |494  |
|POLICE CAR/TRUCK                 |WHITE            |182  |
|OTHER  (EXPLAIN IN NARRATIVE)    |WHITE            |161  |
|BUS                              |BLACK            |87   |
|YELLOW SCHOOL BUS                |BLACK            |53   |
|AMBULANCE                        |WHITE            |48   |
|FIRE TRUCK                       |WHITE

In [121]:
# Analysis Q8 -----------

df_person_filtered = df_person\
  .dropDuplicates(['crash_id','unit_nbr'])\
  .where(col('DRVR_ZIP').isNotNull() & ~upper(col('drvr_zip')).isin('UNKNOWN'))

df_units_filtered = df_units\
  .dropDuplicates(['crash_id','unit_nbr'])

df_person_unit_merged = df_units_filtered\
  .join(df_person_filtered, ['crash_id', 'unit_nbr'], 'inner')\
  .where(upper(col('contrib_factr_1_id')).contains('ALCOHOL')
  | upper(col('contrib_factr_2_id')).contains('ALCOHOL')
  | upper(col('contrib_factr_p1_id')).contains('ALCOHOL')
  | upper(col('contrib_factr_1_id')).contains('DRINKING')
  | upper(col('contrib_factr_2_id')).contains('DRINKING')
  | upper(col('contrib_factr_p1_id')).contains('DRINKING')
  | upper(col("prsn_alc_rslt_id")).contains('POSITIVE'))\
  .groupBy('drvr_zip')\
  .agg(countDistinct('crash_id').alias('crash_count'))\
  .orderBy(col('crash_count').desc()).limit(5)

df_person_unit_merged.show()

+--------+-----------+
|drvr_zip|crash_count|
+--------+-----------+
|   78521|         76|
|   78130|         57|
|   78550|         56|
|   78741|         55|
|   76010|         54|
+--------+-----------+



In [129]:
# Analysis Q9 ---------------

df_units_filtered = df_units\
  .dropDuplicates(['crash_id','unit_nbr'])

df_damage_filtered = df_damage\
  .dropDuplicates(['crash_id'])

df_units_damage_merged = df_units_filtered\
  .join(df_damage_filtered,'crash_id','inner')\
  .where(upper(col('damaged_property')).contains('NONE')
  | upper(col('damaged_property')).contains('NO DAMAGE'))\
  .filter(((upper(col('veh_dmag_scl_1_id')) > "DAMAGED 4") & (~upper(col('veh_dmag_scl_1_id')).isin("NA", "NO DAMAGE", "INVALID VALUE")))
  | ((upper(col('veh_dmag_scl_2_id')) > "DAMAGED 4") & (~upper(col('veh_dmag_scl_2_id')).isin("NA", "NO DAMAGE", "INVALID VALUE"))))\
  .filter(upper(col('fin_resp_type_id')).contains('INSURANCE'))\
  .dropDuplicates(['crash_id'])\
  .orderBy(col("crash_id").desc())

df_units_damage_merged.count()

10

In [133]:
# Analysis Q10 ------------------

df_units_filtered = df_units\
  .dropDuplicates(['crash_id', 'unit_nbr'])

df_charge_filtered = df_charge\
  .dropDuplicates(['crash_id', 'unit_nbr', 'prsn_nbr'])

df_person_filtered = df_person\
  .dropDuplicates(['crash_id', 'unit_nbr'])

df_top_10_vehicle_colors = df_units_filtered\
  .where(~col('veh_color_id').isin('NA')
   & col('veh_color_id').cast("int").isNull())\
  .groupBy('veh_color_id').agg(count('*').alias('color_count')).orderBy(col('color_count').desc()).limit(10)

top_10_vehicle_colors = [row["veh_color_id"] for row in df_top_10_vehicle_colors.collect()]
print(top_10_vehicle_colors)

df_top_25_states = df_units_filtered\
  .where(~col('veh_lic_state_id').isin('NA')\
  & col('veh_lic_state_id').cast("int").isNull())\
  .groupBy('veh_lic_state_id').agg(count('*').alias('state_count')).orderBy(col('state_count').desc()).limit(25)

top_25_states = [row["veh_lic_state_id"] for row in df_top_25_states.collect()]
print(top_25_states)

df_charge_person_units_merged = df_charge_filtered\
  .join(df_person_filtered, ['crash_id', 'unit_nbr', 'prsn_nbr'], 'inner')\
  .join(df_units_filtered, ['crash_id', 'unit_nbr'], 'inner')\
  .where(col('veh_color_id').isin(top_10_vehicle_colors)
  & col('veh_lic_state_id').isin(top_25_states)
  & upper(col('prsn_type_id')).contains('DRIVER')
  & (upper(col('drvr_lic_cls_id')).contains('CLASS') | upper(col('drvr_lic_cls_id')).contains('OTHER'))
  & upper(col('charge')).contains('SPEED'))\
  .groupBy('veh_make_id').agg(count('*').alias('offence_count'))\
  .orderBy(col('offence_count').desc()).limit(5)

df_charge_person_units_merged.show()

['WHI', 'BLK', 'SIL', 'GRY', 'BLU', 'RED', 'GRN', 'MAR', 'TAN', 'GLD']
['TX', 'UN', 'OK', 'LA', 'NM', 'MX', 'CA', 'FL', 'AR', 'IN', 'IL', 'TN', 'AZ', 'MS', 'CO', 'GA', 'KS', 'MO', 'NC', 'AL', 'OH', 'MI', 'MN', 'VA', 'WI']
+-----------+-------------+
|veh_make_id|offence_count|
+-----------+-------------+
|       FORD|         3871|
|  CHEVROLET|         3372|
|     TOYOTA|         2108|
|      DODGE|         1752|
|      HONDA|         1290|
+-----------+-------------+

