In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Dynamic").getOrCreate()

In [None]:
#Importing setup file for read and write transformations of input and output data from config file
from src.setup import  DataProcessor


PROBLEM 1 -  Find the number of crashes (accidents) in which number of males killed are greater than 2?

In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import count

class CrashAnalysis:
    def __init__(self, primary_person_df: DataFrame):
        self.primary_person_df = primary_person_df

    def filter_data(self, gender: str = 'MALE') -> DataFrame:
        df = self.primary_person_df.filter(
            (self.primary_person_df["PRSN_GNDR_ID"] == gender) &
            (self.primary_person_df.DEATH_CNT == 1) 
        )
        return df

    def count_per_crash(self, filtered_df: DataFrame) -> DataFrame:
        return filtered_df.groupBy("CRASH_ID").agg(count("*").alias("death_per_crash"))

    def filter_by_threshold(self, count_df: DataFrame, threshold: int) -> DataFrame:
        return count_df.filter(count_df.death_per_crash > threshold)

    def get_count(self, filtered_count_df: DataFrame) -> int:
        return filtered_count_df.count()

    def run_analysis(self, gender: str = 'MALE', threshold: int = 2) -> int:
        filtered_df = self.filter_data(gender)
        count_df = self.count_per_crash(filtered_df)
        filtered_count_df = self.filter_by_threshold(count_df, threshold)
        return self.get_count(filtered_count_df)




0


PROBLEM 2 - How many two wheelers are booked for crashes? 

In [None]:
from pyspark.sql.functions import instr, lower

class TwoWheelerCounter:
    def __init__(self, units_df):
        self.units_df = units_df
        
    def two_wheelers(self):
        return self.units_df.filter(
            (instr(lower(self.units_df.VEH_BODY_STYL_ID), "motorcycle") > 0) |
            (self.units_df.UNIT_DESC_ID == "PEDALCYCLIST")
        )


PROBLEM 3- Determine the Top 5 Vehicle Makes of the cars present in the crashes in which driver died and Airbags did not deploy.

In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import instr, lower, count, dense_rank, col
from pyspark.sql.window import Window

class VehicleAirbagAnalysis:
    def __init__(self, primary_person_df: DataFrame, units_df: DataFrame):
        self.primary_person_df = primary_person_df
        self.units_df = units_df

    def filter_and_aggregate(self, body_style_keyword: str, person_type: str, death_count: int, airbag_status: str, rank_threshold: int = 5) -> DataFrame:
        window = Window.orderBy(col("Death_per_vehicle_make_without_airbags").desc())
        
        return (
            self.primary_person_df
            .join(self.units_df, ["CRASH_ID", "UNIT_NBR"], "inner")
            .filter(
                (instr(lower(self.units_df.VEH_BODY_STYL_ID), body_style_keyword) > 0) &
                (self.primary_person_df.PRSN_TYPE_ID == person_type) &
                (self.primary_person_df.DEATH_CNT == death_count) &
                (self.primary_person_df.PRSN_AIRBAG_ID == airbag_status)
            )
            .groupBy(self.units_df.VEH_MAKE_ID)
            .agg(count("*").alias("Death_per_vehicle_make_without_airbags"))
            .withColumn("rank", dense_rank().over(window))
            .filter(col("rank") < rank_threshold + 1)
            .drop("rank")
        )


+-----------+--------------------------------------+
|VEH_MAKE_ID|Death_per_vehicle_make_without_airbags|
+-----------+--------------------------------------+
|     NISSAN|                                     4|
|  CHEVROLET|                                     3|
|      HONDA|                                     2|
|       FORD|                                     2|
|   CADILLAC|                                     1|
+-----------+--------------------------------------+
only showing top 5 rows



PROBLEM 4 - Determine number of Vehicles with driver having valid licences involved in hit and run? 

In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col

class DriverLicenseAnalysis:
    def __init__(self, primary_person_df: DataFrame, units_df: DataFrame):
        self.primary_person_df = primary_person_df
        self.units_df = units_df

    def filter_and_count(self, person_type: str, veh_honor_flag: str, valid_lic_types: list, invalid_lic_classes: list) -> int:
        return (
            self.primary_person_df
            .join(self.units_df, ["CRASH_ID", "UNIT_NBR"], "inner")
            .filter(
                (self.primary_person_df.PRSN_TYPE_ID == person_type) &
                (self.units_df.VEH_HNR_FL == veh_honor_flag) &
                (
                    self.primary_person_df.DRVR_LIC_TYPE_ID.isin(valid_lic_types) &
                    (~self.primary_person_df.DRVR_LIC_CLS_ID.isin(invalid_lic_classes))
                )
            )
            .select("CRASH_ID", "UNIT_NBR")
            .distinct()
            .count()
        )


2450


PROBLEM 5 - Which state has highest number of accidents in which females are not involved? 

In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import countDistinct, dense_rank, col
from pyspark.sql.window import Window

class AccidentAnalysis:
    def __init__(self, primary_person_df: DataFrame):
        self.primary_person_df = primary_person_df

    def filter_and_aggregate(self, gender_exclusion: str, person_type: str, group_by_column: str, rank_column: str, rank_value: int = 1) -> DataFrame:
        window = Window.orderBy(col(rank_column).desc())
        
        return (
            self.primary_person_df
            .filter((self.primary_person_df.PRSN_GNDR_ID != gender_exclusion) &
                    (self.primary_person_df.PRSN_TYPE_ID == person_type))
            .groupBy(group_by_column)
            .agg(countDistinct("CRASH_ID").alias(rank_column))
            .withColumn("rank", dense_rank().over(window))
            .filter(col("rank") == rank_value)
            .drop("rank")
        )


PROBLEM 6 Which are the Top 3rd to 5th VEH_MAKE_IDs that contribute to a largest number of injuries including death

In [None]:
from pyspark.sql.functions import sum, col, dense_rank
from pyspark.sql.window import Window

class TopVehicleCategories:
    def __init__(self, primary_person_df, units_df):
        self.primary_person_df = primary_person_df
        self.units_df = units_df
        
    def get_top_vehicle_categories(self, rank_lower, rank_upper):
        window = Window.orderBy(col("Total_Casualties").desc())
        
        df_top_vehicle_cat = (
            self.primary_person_df
            .join(self.units_df, ["CRASH_ID", "UNIT_NBR"], "inner")
            .filter(self.units_df.VEH_MAKE_ID != 'NA')
            .groupBy(self.units_df.VEH_MAKE_ID)
            .agg(sum(self.primary_person_df.TOT_INJRY_CNT).alias("Total_Injuries"), 
                 sum(self.primary_person_df.DEATH_CNT).alias("Total_Deaths"))
            .withColumn("Total_Casualties", col("Total_Injuries") + col("Total_Deaths"))
            .withColumn("rank", dense_rank().over(window))
            .filter((col("rank") >= rank_lower) & (col("rank") <= rank_upper))
            .drop("rank")
        )
        
        return df_top_vehicle_cat


Problem 7 - For all the body styles involved in crashes, mention the top ethnic user group of each unique body style  

In [None]:
from pyspark.sql.functions import countDistinct, col, dense_rank
from pyspark.sql.window import Window

class CountCrashes:
    def __init__(self, primary_person_df, units_df):
        self.primary_person_df = primary_person_df
        self.units_df = units_df
        
    def count_crashes(self, rank):
        window = Window.partitionBy("VEH_BODY_STYL_ID").orderBy(col("Count_of_crashes").desc())
        
        df_count_of_crashes = (
            self.primary_person_df.join(self.units_df, ["CRASH_ID", "UNIT_NBR"], "inner")
            .filter(
                (~self.units_df.VEH_BODY_STYL_ID.isin(["NA", "UNKNOWN", "NOT REPORTED", "OTHER  (EXPLAIN IN NARRATIVE)"])) &
                (~self.primary_person_df.PRSN_ETHNICITY_ID.isin(["NA", "UNKNOWN"]))
            )
            .groupby("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID")
            .agg(countDistinct("CRASH_ID").alias("Count_of_crashes"))
            .withColumn("rank", dense_rank().over(window))
            .filter(col("rank") == rank)
            .drop("rank", "Count_of_crashes")
        )
        
        return df_count_of_crashes


Problem 8 Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash (Use Driver Zip Code)

In [None]:
from pyspark.sql.functions import countDistinct, instr, lower, col, dense_rank
from pyspark.sql.window import Window

class DrunkAndDriveCases:
    def __init__(self, primary_person_df, units_df, zip_code_col="DRVR_ZIP", rank_lower=1, rank_upper=5):
        self.primary_person_df = primary_person_df
        self.units_df = units_df
        self.zip_code_col = zip_code_col
        self.rank_lower = rank_lower
        self.rank_upper = rank_upper
        
    def calculate_drunk_and_drive_cases(self):
        window = Window.orderBy(col("Count_of_crashes").desc())
        
        alcohol_condition = (
            (instr(lower(self.units_df.CONTRIB_FACTR_1_ID), "alcohol") > 0) |
            (instr(lower(self.units_df.CONTRIB_FACTR_P1_ID), "alcohol") > 0) |
            (instr(lower(self.units_df.CONTRIB_FACTR_2_ID), "alcohol") > 0)
        )
        
        df_drunk_and_drive_cases = (
            self.primary_person_df.join(self.units_df, ["CRASH_ID", "UNIT_NBR"], "inner")
            .filter(
                alcohol_condition &
                (self.primary_person_df[self.zip_code_col] != 'NULL')
            )
            .groupby(self.zip_code_col)
            .agg(countDistinct("CRASH_ID").alias("Count_of_crashes"))
            .withColumn("rank", dense_rank().over(window))
            .filter(col("rank").between(self.rank_lower, self.rank_upper))
            .drop("rank")
        )
        
        return df_drunk_and_drive_cases


Problem 9 Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance

In [None]:
from pyspark.sql.functions import countDistinct, col

class NoDamagePropertyCrashes:
    def __init__(self, units_df, damages_df):
        self.units_df = units_df
        self.damages_df = damages_df
        
    def calculate_no_damage_property_crashes(self):
        df_no_damage_property = (
            self.units_df.join(damages_df, "CRASH_ID", "inner")
            .filter(
                (
                    ((self.units_df.VEH_DMAG_SCL_1_ID).isin("DAMAGED 5", "DAMAGED 6", "DAMAGED 7 HIGHEST")) |
                    ((self.units_df.VEH_DMAG_SCL_2_ID).isin("DAMAGED 5", "DAMAGED 6", "DAMAGED 7 HIGHEST"))
                ) &
                (self.damages_df.DAMAGED_PROPERTY == "NONE") &
                (self.units_df.FIN_RESP_TYPE_ID.isin("LIABILITY INSURANCE POLICY", "PROOF OF LIABILITY INSURANCE"))
            )
            .agg(countDistinct("CRASH_ID").alias("Count_of_crashes"))
        )
        
        return df_no_damage_property

Problem 10 -Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, used top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)

In [None]:
from pyspark.sql.functions import countDistinct, col, dense_rank, instr, lower
from pyspark.sql.window import Window

class TopOffenses:
    def __init__(self, primary_person_df, units_df, charges_df):
        self.primary_person_df = primary_person_df
        self.units_df = units_df
        self.charges_df = charges_df
    
    def top_states(self, limit=25):
        window = Window.orderBy(col("Count_of_crashes_having_charges").desc())
        top_states_df = (
            self.primary_person_df.join(self.charges_df, ["CRASH_ID", "PRSN_NBR", "UNIT_NBR"], "inner")
            .filter(~self.primary_person_df.DRVR_LIC_STATE_ID.isin("Unknown", "NA", "Other"))
            .groupBy("DRVR_LIC_STATE_ID")
            .agg(countDistinct(self.charges_df.CRASH_ID).alias("Count_of_crashes_having_charges"))
            .withColumn("rank", dense_rank().over(window))
            .filter(col("rank") <= limit)
            .drop("rank", "Count_of_crashes_having_charges")
        )
        return [row.DRVR_LIC_STATE_ID for row in top_states_df.collect()]
    
    def top_colors(self, limit=10):
        window = Window.orderBy(col("Count_of_crashes_having_charges").desc())
        top_colors_df = (
            self.units_df.join(self.charges_df, ["CRASH_ID", "UNIT_NBR"], "inner")
            .filter(~self.units_df.VEH_COLOR_ID.isin("NA"))
            .groupBy("VEH_COLOR_ID")
            .agg(countDistinct(self.charges_df.CRASH_ID).alias("Count_of_crashes_having_charges"))
            .withColumn("rank", dense_rank().over(window))
            .filter(col("rank") <= limit)
            .drop("rank", "Count_of_crashes_having_charges")
        )
        return [row.VEH_COLOR_ID for row in top_colors_df.collect()]
    
    def top_vehicle_makers(self, states, colors, limit=5):
        window = Window.orderBy(col("Count_of_crashes_having_charges").desc())
        top_makers_df = (
            self.primary_person_df.join(self.charges_df, ["CRASH_ID", "PRSN_NBR", "UNIT_NBR"], "inner")
            .join(self.units_df, ["CRASH_ID", "UNIT_NBR"], "inner")
            .filter(
                (
                    (self.primary_person_df.DRVR_LIC_TYPE_ID.isin("DRIVER LICENSE", "COMMERCIAL DRIVER LIC", "OCCUPATIONAL")) |
                    (~self.primary_person_df.DRVR_LIC_CLS_ID.isin("UNLICENSED", "NA", "UNKNOWN"))
                ) &
                (instr(lower(self.charges_df.CHARGE), "speed") > 0) &
                (self.units_df.VEH_COLOR_ID.isin(colors)) &
                (self.units_df.VEH_LIC_STATE_ID.isin(states))
            )
            .groupBy("VEH_MAKE_ID")
            .agg(countDistinct(self.charges_df.CRASH_ID).alias("Count_of_crashes_having_charges"))
            .withColumn("rank", dense_rank().over(window))
            .filter(col("rank") <= limit)
            .drop("rank", "Count_of_crashes_having_charges")
        )
        return top_makers_df




Calling main function and instanciating corresponding classes for problems

In [None]:
if __name__ == "__main__":
 
    #Instanciating DataProcessor class
    data_setup=DataProcessor()

    # Problem 1. Find the number of crashes (accidents) in which number of males killed are greater than 2?
    analysis = CrashAnalysis(primary_person_df)
    result_DF = analysis.run_analysis(gender='MALE', threshold=2)
    result_DF.show()
    write_output(result_DF,'1',output_format)

    # 2. How many two-wheelers are booked for crashes?
    two_wheeler_counter = TwoWheelerCounter(units_df)
    Result_df = two_wheelers()
    print("Count of two-wheelers:", Result_df.count())
    write_output(Result_df,'2',output_format)

    # 3. Determine the Top 5 Vehicles made of the cars present in the crashes in which a driver died and Airbags did
    # not deploy.
    analysis = VehicleAirbagAnalysis(primary_person_df, units_df)
    df_top_vehicles_without_airbag = analysis.filter_and_aggregate(
    body_style_keyword="car", 
    person_type="DRIVER", 
    death_count=1, 
    airbag_status="NOT DEPLOYED",
    rank_threshold=5
    )
    df_top_vehicles_without_airbag.show()
    write_output(df_top_vehicles_without_airbag,'3',output_format)

    # 4. Determine the number of Vehicles with a driver having valid licences involved in hit-and-run?

    analysis = DriverLicenseAnalysis(primary_person_df, units_df)
    df_driver_without_driving_license = analysis.filter_and_count(
    person_type="DRIVER",
    veh_honor_flag="Y",
    valid_lic_types=["DRIVER LICENSE", "COMMERCIAL DRIVER LIC", "OCCUPATIONAL"],
    invalid_lic_classes=["UNLICENSED", "NA", "UNKNOWN"])
    print(df_driver_without_driving_license)
    write_output(df_top_vehicles_without_airbag,'4',output_format)

    # 5. Which state has the highest number of accidents in which females are not involved?

    analysis = AccidentAnalysis(primary_person_df)
    df_states_having_highest_accidents_without_women = analysis.filter_and_aggregate(
    gender_exclusion="FEMALE",
    person_type="DRIVER",
    group_by_column="DRVR_LIC_STATE_ID",
    rank_column="Non Women Driver Accident Cases",
    rank_value=1)
    df_states_having_highest_accidents_without_women.show()
    write_output(df_states_having_highest_accidents_without_women,'5',output_format)


    # 6. Which are the Top 3rd to 5th VEH_MAKE_IDs that contribute to a largest number of injuries including death

    top_vehicle_categories = TopVehicleCategories(primary_person_df, units_df)
    rank_lower = 3
    rank_upper = 5
    df_top_vehicle_cat = top_vehicle_categories.get_top_vehicle_categories(rank_lower, rank_upper)
    df_top_vehicle_cat.show()
    write_output(df_top_vehicle_cat,'6',output_format)



    # 7. For all the body styles involved in crashes, mention the top ethnic user group of each unique body style
    
    count_crashes = CountCrashes(primary_person_df, units_df)
    df_count_of_crashes = count_crashes.count_crashes(1)
    df_count_of_crashes.show()
    write_output(df_count_of_crashes,'7',output_format)

    # 8. Among the crashed cars, what are the Top 5 Zip Codes with the highest number of crashes with alcohol as the
    # contributing factor to a crash (Use Driver Zip Code)
    
    drunk_and_drive_cases = DrunkAndDriveCases(primary_person_df, units_df, zip_code_col="DRVR_ZIP", rank_lower=1, rank_upper=5)
    result_df = drunk_and_drive_cases.calculate_drunk_and_drive_cases()
    result_df.show()
    write_output(result_df,'8',output_format)



    # 9. Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above
    # 4 and car avails Insurance
    no_damage_property_crashes = NoDamagePropertyCrashes(units_df, damages_df)
    result_df = no_damage_property_crashes.calculate_no_damage_property_crashes()
    result_df.show()
    write_output(result_df,'9',output_format)


    # 10. Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed
    # Drivers, used top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of
    # offenses (to be deduced from the data)
    
    top_offenses = TopOffenses(primary_person_df, units_df, charges_df)
    top_states = top_offenses.top_states()
    top_colors = top_offenses.top_colors()
    top_makers = top_offenses.top_vehicle_makers(top_states, top_colors)
    print("Top vehicle makers with offenses:")
    top_makers.show()
    write_output(top_makers,'9',output_format)


    spark.stop()