In [3]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=c8eb6eb2af00d0dcdb3f4dbfcd9a50045f6bcb6a502623c2b88e410692debca1
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [4]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
Source_file_path = '/content/drive/MyDrive/Case_study/'

In [6]:
# prompt: start the sparksession


spark = SparkSession.builder.appName("BCG_CASE_STUDY").getOrCreate()


In [7]:
charges_df =  spark.read.format("csv").option('Header',True).option('inferSchema',True).option('sep',',').load(Source_file_path+ 'Charges_use.csv',escape="\n",multiLine=True)

damages_df =  spark.read.format("csv").option('Header',True).option('inferSchema',True).option('sep',',').load(Source_file_path+ 'Damages_use.csv',escape="\n",multiLine=True)

endorse_df =  spark.read.format("csv").option('Header',True).option('inferSchema',True).option('sep',',').load(Source_file_path+ 'Endorse_use.csv',escape="\n",multiLine=True)

primary_person_df =  spark.read.format("csv").option('Header',True).option('inferSchema',True).option('sep',',').load(Source_file_path+ 'Primary_Person_use.csv',escape="\n",multiLine=True)

restrict_use_df =  spark.read.format("csv").option('Header',True).option('inferSchema',True).option('sep',',').load(Source_file_path+ 'Restrict_use.csv',escape="\n",multiLine=True)

unit_df =  spark.read.format("csv").option('Header',True).option('inferSchema',True).option('sep',',').load(Source_file_path+ 'Units_use.csv',escape="\n",multiLine=True)


In [9]:

#Analysis 1 : Find the number of crashes (accidents) in which number of males killed are greater than 2

num_males_killed_df = primary_person_df.filter(primary_person_df['PRSN_GNDR_ID'] == 'MALE') \
                                       .groupBy('CRASH_ID') \
                                       .agg(sum('DEATH_CNT').alias('MALE_DEATH_CNT')) \
                                       .filter('MALE_DEATH_CNT > 2').agg(count('CRASH_ID').alias('num_male_killed'))
#display(num_males_killed_df)
num_males_killed_df.show()

+---------------+
|num_male_killed|
+---------------+
|              0|
+---------------+



In [10]:
  # Analysis 2: How many two wheelers are booked for crashes?
two_wheelers_df = unit_df.filter(col('VEH_BODY_STYL_ID').like('%MOTORCYCLE%')).\
                          agg(count('CRASH_ID').alias('Motorcycle_crash_count'))
#display(two_wheelers_df)
two_wheelers_df.show()

+----------------------+
|Motorcycle_crash_count|
+----------------------+
|                   784|
+----------------------+



In [11]:
# Analysis 3: Determine the Top 5 Vehicle Makes of the cars present in the crashes in which driver died and Airbags did not deploy
unit_primary_joined_df = unit_df.alias('e1').join(primary_person_df.alias('e2'), on='CRASH_ID', how='inner')

filtered_df = unit_primary_joined_df.\
            filter((col('PRSN_TYPE_ID') == 'DRIVER') & (col('e1.DEATH_CNT') > 0) &
                   (col('PRSN_AIRBAG_ID') == 'NOT DEPLOYED')).\
            filter(~col('VEH_BODY_STYL_ID').like('%MOTORCYCLE%'))

top_5_vehicle_makes_df = filtered_df.groupBy('VEH_MAKE_ID').\
                                agg(count('e1.CRASH_ID').alias('crash_count')).\
                                orderBy(col('crash_count').desc()).\
                                limit(5)
#(top_5_vehicle_makes_df)
top_5_vehicle_makes_df.show()

+-----------+-----------+
|VEH_MAKE_ID|crash_count|
+-----------+-----------+
|         NA|         59|
|       FORD|         29|
|  CHEVROLET|         28|
|     NISSAN|         18|
|      HONDA|         10|
+-----------+-----------+



In [12]:
# Analysis 4: Determine number of Vehicles with driver having valid licences involved in hit and run

unit_primary_joined_df = unit_df.alias('e1').join(primary_person_df.alias('e2'), on='CRASH_ID', how='inner')
valid_licence_hit_run_df = unit_primary_joined_df.filter((col('e2.DRVR_LIC_TYPE_ID') == 'DRIVER LICENSE') &
                                            (col('e1.VEH_HNR_FL') == 'Y')).\
                                            agg(count('e1.CRASH_ID').alias('HNR_COUNT'))

#display(valid_licence_hit_run_df)
valid_licence_hit_run_df.show()

+---------+
|HNR_COUNT|
+---------+
|     6083|
+---------+



In [13]:
# Analysis 5: Which state has highest number of accidents in which females are not involved

state_highest_accidents_df =  primary_person_df.filter(col('PRSN_GNDR_ID') != 'FEMALE').\
                                groupBy('DRVR_LIC_STATE_ID').agg(count(col('CRASH_ID')).alias('No_of_accidents')).orderBy(col('No_of_accidents').desc()).limit(1)
#display(state_highest_accidents_df)
state_highest_accidents_df.show()

+-----------------+---------------+
|DRVR_LIC_STATE_ID|No_of_accidents|
+-----------------+---------------+
|            Texas|          83016|
+-----------------+---------------+



In [14]:
# Analysis 6: Which are the Top 3rd to 5th VEH_MAKE_IDs that contribute to a largest number of injuries including death

window_spec = Window.orderBy(col('INJURY_DEATH_CNT').desc())
large_no_of_injuries = unit_df.withColumn('INJURY_DEATH_CNT',col('TOT_INJRY_CNT')+ col('DEATH_CNT')).\
                        groupBy('VEH_MAKE_ID').agg(sum('INJURY_DEATH_CNT').alias('INJURY_DEATH_CNT')).orderBy(col('INJURY_DEATH_CNT').desc())
top_3_to_5_veh = large_no_of_injuries.withColumn('rank', dense_rank().over(window_spec)).filter(col('rank').between(3,5)).select('VEH_MAKE_ID','INJURY_DEATH_CNT')

#display(top_3_to_5_veh)
state_highest_accidents_df.show()


+-----------------+---------------+
|DRVR_LIC_STATE_ID|No_of_accidents|
+-----------------+---------------+
|            Texas|          83016|
+-----------------+---------------+



In [15]:
# Analysis 7: For all the body styles involved in crashes, mention the top ethnic user group of each unique body style


unit_primary_joined_df = unit_df.join(primary_person_df.alias('e2'), on='CRASH_ID', how='inner').alias('e1')
top_ethnic_user_group_df = unit_primary_joined_df.groupBy(col('VEH_BODY_STYL_ID'),col('PRSN_ETHNICITY_ID')).\
                            agg(count(col('e1.CRASH_ID')).alias('ETHNICITY_COUNT')).\
                            orderBy(col('ETHNICITY_COUNT').desc()).limit(1)

#display(top_ethnic_user_group_df)
top_ethnic_user_group_df.show()

+--------------------+-----------------+---------------+
|    VEH_BODY_STYL_ID|PRSN_ETHNICITY_ID|ETHNICITY_COUNT|
+--------------------+-----------------+---------------+
|PASSENGER CAR, 4-...|            WHITE|          58312|
+--------------------+-----------------+---------------+



In [16]:
# Analysis 8: Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash (Use Driver Zip Code)
top_5_zip_codes_alcohol_df = primary_person_df.filter(col('PRSN_ALC_RSLT_ID')== 'Positive').\
                                             groupBy('DRVR_ZIP').\
                                             agg(count(col('CRASH_ID')).alias('ACCIDENT_COUNT')).\
                                             orderBy(col('ACCIDENT_COUNT').desc()).\
                                             filter(col('DRVR_ZIP').isNotNull()).\
                                             limit(5)

#display(top_5_zip_codes_alcohol_df)
top_5_zip_codes_alcohol_df.show()

+--------+--------------+
|DRVR_ZIP|ACCIDENT_COUNT|
+--------+--------------+
|   78521|            62|
|   76010|            48|
|   79936|            42|
|   79938|            37|
|   79907|            34|
+--------+--------------+



In [20]:
# Analysis 9: Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance

distinct_crash_ids_df = unit_df.alias('e1').join(damages_df.alias('e3'), on ='CRASH_ID', how = 'inner').\
                        filter(col('e3.DAMAGED_PROPERTY').isin(['NONE','NONE1'])).\
                        filter(col('e1.FIN_RESP_PROOF_ID') == 1).\
                        filter(col('e1.VEH_DMAG_SCL_1_ID').isin(['DAMAGED 5','DAMAGED 6'])).\
                        agg(count('CRASH_ID').alias('NO_DAMAGE_COUNT'))

#display(distinct_crash_ids_df)
distinct_crash_ids_df.show()

+---------------+
|NO_DAMAGE_COUNT|
+---------------+
|             10|
+---------------+



In [18]:
# Analysis 10: Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, used top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)


top_25_states_df =  unit_df.groupBy(col('VEH_LIC_STATE_ID')).\
                            agg(count(col('CRASH_ID')).alias('crash_count')).\
                            orderBy(col('crash_count').desc()).\
                            limit(25)

top_10_vehicle_colours_df = unit_df.groupBy(col('VEH_COLOR_ID')).\
                                    agg(count(col('CRASH_ID')).alias('crash_count_colour')).\
                                    orderBy(col('crash_count_colour').desc()).\
                                    limit(10)



top_5_vehicle_makes_speed_df = (
    unit_df.alias('e1')
    .join(primary_person_df.alias('e2'), 'CRASH_ID', 'inner')
    .filter(col('e2.DRVR_LIC_TYPE_ID') == 'DRIVER LICENSE')
    .filter(col('e1.CONTRIB_FACTR_1_ID').like('%SPEED%'))
    .filter(
        (col('e1.VEH_LIC_STATE_ID').isin([row['VEH_LIC_STATE_ID'] for row in top_25_states_df.collect()])) &
        col('e1.VEH_COLOR_ID').isin([row['VEH_COLOR_ID'] for row in top_10_vehicle_colours_df.collect()])
    )
    .groupBy('VEH_MAKE_ID')
    .agg(count(col('e1.CRASH_ID')).alias('crash_count'))
    .orderBy(col('crash_count').desc())
    .limit(5)
)

#display(top_5_vehicle_makes_speed_df)
top_5_vehicle_makes_speed_df.show()

+-----------+-----------+
|VEH_MAKE_ID|crash_count|
+-----------+-----------+
|       FORD|       8445|
|  CHEVROLET|       7363|
|     TOYOTA|       4308|
|      DODGE|       3701|
|      HONDA|       2809|
+-----------+-----------+

