In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder \
    .appName("queryoutput")\
    .getOrCreate()

In [3]:
df_damages = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("../input_data/Damages_use.csv").distinct()
df_endorse = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("../input_data/Endorse_use.csv").distinct()
df_charges = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("../input_data/Charges_use.csv").distinct()
df_restrict = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("../input_data/Restrict_use.csv").distinct()
df_primperson = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("../input_data/Primary_Person_use.csv").distinct()
df_units = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("../input_data/Units_use.csv").distinct()

In [5]:
"""
1. Find the number of crashes (accidents) in which number of males killed are greater than 2?
"""
df = (df_primperson
              .filter((col('PRSN_GNDR_ID') == 'MALE') & (col('DEATH_CNT') == 1))
              .groupBy('CRASH_ID')
              .agg(sum(col("DEATH_CNT")).alias("NO_OF_MALES_KILLED"))
              .filter(col("NO_OF_MALES_KILLED") > 2)
              .agg(count("NO_OF_MALES_KILLED").alias("NO_OF_CRASHES_OF_MALE_DEATH_GT_2")))
df.show()

+--------------------------------+
|NO_OF_CRASHES_OF_MALE_DEATH_GT_2|
+--------------------------------+
|                               0|
+--------------------------------+



In [6]:
"""
2. How many two wheelers are booked for crashes?
"""
df = (df_units
              .filter(col('VEH_BODY_STYL_ID').isin("MOTORCYCLE", "POLICE MOTORCYCLE"))
              .select(countDistinct('VIN').alias('NO_OF_DISTINCT_TWO_WHEELERS')))
df.show()

+---------------------------+
|NO_OF_DISTINCT_TWO_WHEELERS|
+---------------------------+
|                        766|
+---------------------------+



In [7]:
"""
3. Determine the Top 5 Vehicle Makes of the cars present in the crashes in which driver died and Airbags did not deploy
"""
df = (df_primperson
              .join(df_units, on=['CRASH_ID', 'UNIT_NBR'], how="inner")
              .filter((col('PRSN_TYPE_ID').contains('DRIVER')) & 
                      (col('PRSN_AIRBAG_ID') == 'NOT DEPLOYED') &
                      (df_primperson.DEATH_CNT == 1) & 
                      (col('VEH_BODY_STYL_ID').contains('CAR')))
              .groupBy('VEH_MAKE_ID')
              .agg(countDistinct("VIN").alias('COUNT_OF_VEHICLES_PER_VEH_MAKE'))
              .orderBy(col('COUNT_OF_VEHICLES_PER_VEH_MAKE').desc(), col('VEH_MAKE_ID').asc())
              .limit(5))
df.show()

+-----------+------------------------------+
|VEH_MAKE_ID|COUNT_OF_VEHICLES_PER_VEH_MAKE|
+-----------+------------------------------+
|     NISSAN|                             4|
|  CHEVROLET|                             3|
|       FORD|                             2|
|      HONDA|                             2|
|      BUICK|                             1|
+-----------+------------------------------+



In [8]:
"""
4. Determine number of Vehicles with driver having valid licences involved in hit and run?
"""
        # Considering VALID DRIVER LICENSES as the ones with class ID in the list->
        # [CLASS A, CLASS B, CLASS C, CLASS M, CLASS A AND M, CLASS B AND M, CLASS C AND M]
df = (df_primperson
              .join(df_units, on=['CRASH_ID', 'UNIT_NBR'], how="inner")
              .filter((col('PRSN_TYPE_ID').contains('DRIVER')) & 
                      (col('DRVR_LIC_CLS_ID').contains('CLASS')) &
                      (col('VEH_HNR_FL') == 'Y'))
              .select(countDistinct("VIN").alias("NO_OF_VEHICLES")))
df.show()

+--------------+
|NO_OF_VEHICLES|
+--------------+
|          2365|
+--------------+



In [10]:
"""
5. Which state has highest number of accidents in which females are not involved?
"""
        # This query is written with assumption that the state in which accident occured 
        # and state of VEH_LIC_STATE_ID is same
df_crash_with_no_female = (
            df_primperson
            .withColumn("NO_OF_FEMALES", when(col("PRSN_GNDR_ID") == "FEMALE", 1).otherwise(0))
            .groupBy("CRASH_ID")
            .agg(sum(col("NO_OF_FEMALES")).alias("NO_OF_FEMALES_PER_CRASH"))
            .filter(col("NO_OF_FEMALES_PER_CRASH") == 0)
            .select("CRASH_ID")
        )

df_state_most_accidents = (
            df_crash_with_no_female
            .join(df_units, on="CRASH_ID", how="inner")
            .dropDuplicates(["CRASH_ID"])
            .groupBy("VEH_LIC_STATE_ID")
            .count()
            .orderBy(col("count").desc())
            .limit(1)
        )
df_state_most_accidents.show()

+----------------+-----+
|VEH_LIC_STATE_ID|count|
+----------------+-----+
|              TX|34757|
+----------------+-----+



In [11]:
"""
6. Which are the Top 3rd to 5th VEH_MAKE_IDs that contribute to a largest number of injuries including death
"""
w = Window.orderBy(col("TOTAL_INJURIES_VEH_MAKEID").desc())

df = (df_units
              .withColumn('TOTAL_INJURIES', col('TOT_INJRY_CNT') + col('DEATH_CNT'))
              .groupBy("VEH_MAKE_ID")
              .agg(sum("TOTAL_INJURIES").alias('TOTAL_INJURIES_VEH_MAKEID'))
              .withColumn("row", row_number().over(w))
              .filter(col("row").between(3, 5)))
df.show()

+-----------+-------------------------+---+
|VEH_MAKE_ID|TOTAL_INJURIES_VEH_MAKEID|row|
+-----------+-------------------------+---+
|     TOYOTA|                     4227|  3|
|      DODGE|                     3138|  4|
|     NISSAN|                     3114|  5|
+-----------+-------------------------+---+



In [12]:
"""
7. For all the body styles involved in crashes, mention the top ethnic user group of each unique body style
"""
w = Window.partitionBy("VEH_BODY_STYL_ID").orderBy(col("count").desc())

df = (df_primperson
              .join(df_units, on=['CRASH_ID', 'UNIT_NBR'], how='inner')
              .filter(~col('VEH_BODY_STYL_ID').isin(["NA", "UNKNOWN", "NOT REPORTED", "OTHER  (EXPLAIN IN NARRATIVE)"]) &
                      ~col('PRSN_ETHNICITY_ID').isin(["NA", "UNKNOWN"]))
              .groupBy("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID")
              .count()
              .withColumn("row", row_number().over(w))
              .filter(col("row") == 1)
              .drop("row", "count"))
df.show()

+--------------------+-----------------+
|    VEH_BODY_STYL_ID|PRSN_ETHNICITY_ID|
+--------------------+-----------------+
|           AMBULANCE|            WHITE|
|                 BUS|            BLACK|
|      FARM EQUIPMENT|            WHITE|
|          FIRE TRUCK|            WHITE|
|          MOTORCYCLE|            WHITE|
|NEV-NEIGHBORHOOD ...|            WHITE|
|PASSENGER CAR, 2-...|            WHITE|
|PASSENGER CAR, 4-...|            WHITE|
|              PICKUP|            WHITE|
|    POLICE CAR/TRUCK|            WHITE|
|   POLICE MOTORCYCLE|            WHITE|
|SPORT UTILITY VEH...|            WHITE|
|               TRUCK|            WHITE|
|       TRUCK TRACTOR|            WHITE|
|                 VAN|            WHITE|
|   YELLOW SCHOOL BUS|            BLACK|
+--------------------+-----------------+



In [13]:
"""
8. Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols
        as the contributing factor to a crash (Use Driver Zip Code)
"""
df = (
            df_units
            .join(df_primperson, on=["CRASH_ID","UNIT_NBR"], how="inner")
            .dropna(subset=["DRVR_ZIP"])
            .filter(
                col("CONTRIB_FACTR_1_ID").contains("ALCOHOL") |
                col("CONTRIB_FACTR_2_ID").contains("ALCOHOL") |
                col("CONTRIB_FACTR_P1_ID").contains("ALCOHOL")
            )
            .filter(col("VEH_BODY_STYL_ID").like("%CAR%"))
            .dropDuplicates(["CRASH_ID"])
            .groupby("DRVR_ZIP")
            .count()
            .orderBy(col("count").desc())
            .limit(5)
            .drop("count")
        )

df.show()

+--------+
|DRVR_ZIP|
+--------+
|   75052|
|   75067|
|   76010|
|   78521|
|   78130|
+--------+



In [14]:
"""
9. Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is
        above 4 and car avails Insurance
"""
damage_list_gt4=["DAMAGED 5","DAMAGED 6","DAMAGED 7 HIGHEST"]
df = (
            df_units.alias("u")
            .join(df_damages.alias("d"), on=["CRASH_ID"], how="left")
            .filter(
                (col("u.VEH_DMAG_SCL_1_ID").isin(damage_list_gt4))  |
                (col("u.VEH_DMAG_SCL_2_ID").isin(damage_list_gt4))
            )
            .filter((col("d.DAMAGED_PROPERTY") == "NONE") | (col("d.DAMAGED_PROPERTY").isNull()==True))
            .filter(col("u.FIN_RESP_TYPE_ID").like("%INSURANCE%"))
            .filter(col("u.VEH_BODY_STYL_ID").like("%CAR%"))
            .select(countDistinct("u.CRASH_ID").alias("COUNT_OF_DISTINCT_CRASH_IDS"))
        )
df.show()

+---------------------------+
|COUNT_OF_DISTINCT_CRASH_IDS|
+---------------------------+
|                       5446|
+---------------------------+



In [15]:
"""
10. Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences,
        has licensed Drivers, used top 10 used vehicle colours and has car licensed with the Top 25 states
        with highest number of offences (to be deduced from the data)
"""
top25_offences_state = [
            row[0] for row in df_units
            .join(df_charges, on=["CRASH_ID","UNIT_NBR"], how="inner")
            .filter(col("VEH_LIC_STATE_ID").cast("int").isNull())
            .dropDuplicates(["CRASH_ID","UNIT_NBR"])
            .groupby("VEH_LIC_STATE_ID")
            .count()
            .orderBy(col("count").desc())
            .limit(25)
            .collect()
        ]

top10_vehicle_colors = [
            row[0] for row in df_units
            .filter(col("VEH_COLOR_ID") != "NA")
            .dropDuplicates(["CRASH_ID","UNIT_NBR"])
            .groupby("VEH_COLOR_ID")
            .count()
            .orderBy(col("count").desc())
            .limit(10)
            .collect()
        ]

df = (
            df_charges
            .join(df_primperson, on=["CRASH_ID","UNIT_NBR"], how="inner")
            .join(df_units, on=["CRASH_ID","UNIT_NBR"], how="inner")
            .filter(df_charges.CHARGE.contains("SPEED"))
            .filter(df_primperson.DRVR_LIC_TYPE_ID.isin(["DRIVER LICENSE", "COMMERCIAL DRIVER LIC."]))
            .filter(df_units.VEH_COLOR_ID.isin(top10_vehicle_colors))
            .filter(df_units.VEH_LIC_STATE_ID.isin(top25_offences_state))
            .dropDuplicates(["CRASH_ID","UNIT_NBR"])
            .groupby("VEH_MAKE_ID")
            .count()
            .orderBy(col("count").desc())
            .limit(5)
            .drop("count")
        )
df.show()

+-----------+
|VEH_MAKE_ID|
+-----------+
|       FORD|
|  CHEVROLET|
|     TOYOTA|
|      DODGE|
|      HONDA|
+-----------+

