import related modules/libraries/classes

In [1]:
# All built in imports
import os
import sys

# Get the current working directory
current_dir = os.getcwd()

# Navigate to the project root directory (one level up from 'solutions')
project_root = os.path.dirname(current_dir)

# Add the project root to sys.path for dynamic imports
sys.path.append(project_root)

print("Current Working Directory:", current_dir)
print("Project Root Directory:", project_root)


# All custom imports
from modules.setup import read_all_data

Current Working Directory: c:\Users\Ankit\Desktop\case_study_bcg\solutions
Project Root Directory: c:\Users\Ankit\Desktop\case_study_bcg


Initialize spark and read all data from raw files

In [2]:
df_charges,df_damages,df_endorse_use,df_primary_person_use,df_restrict_use,df_units_use = read_all_data()

Spark version is: 3.5.3

------ Printing config below ------
{'input_files': {'charges': {'file_type': 'csv', 'input_file_path': '../raw_data/Charges_use.csv'}, 'damages': {'file_type': 'csv', 'input_file_path': '../raw_data/Damages_use.csv'}, 'endorse_use': {'file_type': 'csv', 'input_file_path': '../raw_data/Endorse_use.csv'}, 'primary_person_use': {'file_type': 'csv', 'input_file_path': '../raw_data/Primary_Person_use.csv'}, 'restrict_use': {'file_type': 'csv', 'input_file_path': '../raw_data/Restrict_use.csv'}, 'units_use': {'file_type': 'csv', 'input_file_path': '../raw_data/Units_use.csv'}}}
------ Config ends ------ 

Reading dataset: charges:
+--------+--------+--------+--------------------+------------+
|CRASH_ID|UNIT_NBR|PRSN_NBR|              CHARGE|CITATION_NBR|
+--------+--------+--------+--------------------+------------+
|14768622|       1|       1|DRIVING WHILE INT...|        NULL|
+--------+--------+--------+--------------------+------------+
only showing top 1 row





Initialize sparkSession and read config file

Analytics 1: Find the number of crashes (accidents) in which number of males killed are greater than 2?

In [3]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import count

class CrashCount:
    # read_all_data()
    def __init__(self, primary_person_df: DataFrame):
        self.primary_person_df = primary_person_df

    def run_analysis(self, gender: str = 'MALE', threshold: int = 2) -> int:
        """
        Run the analysis to count crashes where the number of deaths per crash 
        for a specific gender exceeds a threshold.

        Args:
            gender (str): The gender to filter (default: 'MALE').
            threshold (int): The minimum number of deaths per crash to count (default: 2).

        Returns:
            int: The count of crashes meeting the criteria.
        """
        # Filter DataFrame for the specified gender and death count
        filtered_df = self.primary_person_df.filter(
            (self.primary_person_df["PRSN_GNDR_ID"] == gender) &
            (self.primary_person_df["DEATH_CNT"] == 1)            # filtering if death occured
        )
        
        # Count the number of deaths per crash
        count_df = filtered_df.groupBy("CRASH_ID").agg(count("*").alias("death_per_crash"))
        
        # Filter crashes where death count exceeds the threshold
        filtered_count_df = count_df.filter(count_df["death_per_crash"] > threshold)
        
        # Return the count of such crashes
        return filtered_count_df.count()


solution1 = CrashCount(df_primary_person_use)

print("Number of crashes in which males killed are greater than 2 :", solution1.run_analysis('MALE', 2))



Number of crashes in which males killed are greater than 2 : 0


2.	Analysis 2: How many two wheelers are booked for crashes? 

In [4]:
from pyspark.sql.functions import instr, lower

class CountTwoWheelers:
    def __init__(self, df_units_use):
        self.df_units_use = df_units_use
        
    def two_wheelers(self):
        return self.df_units_use.filter(
            (instr(lower(self.df_units_use.VEH_BODY_STYL_ID), "motorcycle") > 0) |
            (self.df_units_use.UNIT_DESC_ID == "PEDALCYCLIST")
        ).count()

counttwowheelers  = CountTwoWheelers(df_units_use)
print("No. of two wheelers booked for crashes :", counttwowheelers.two_wheelers())

No. of two wheelers booked for crashes : 948


3.	Analysis 3: Determine the Top 5 Vehicle Makes of the cars present in the crashes in which driver died and Airbags did not deploy.

In [5]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import instr, lower, count, dense_rank, col
from pyspark.sql.window import Window

class CarMakeEvaluation:
    def __init__(self, primary_person_df: DataFrame, units_df: DataFrame):
        self.primary_person_df = primary_person_df
        self.units_df = units_df

    def filter_and_aggregate(self, body_style_keyword: str, person_type: str, death_count: int, airbag_status: str, rank_threshold: int = 5) -> DataFrame:
        window = Window.orderBy(col("Death_per_vehicle_make_without_airbags").desc())
        
        return (
            self.primary_person_df
            .join(self.units_df, ["CRASH_ID", "UNIT_NBR"], "inner")
            .filter(
                (instr(lower(self.units_df.VEH_BODY_STYL_ID), body_style_keyword) > 0) &
                (self.primary_person_df.PRSN_TYPE_ID == person_type) &
                (self.primary_person_df.DEATH_CNT == death_count) &
                (self.primary_person_df.PRSN_AIRBAG_ID == airbag_status)
            )
            .groupBy(self.units_df.VEH_MAKE_ID)
            .agg(count("*").alias("Death_per_vehicle_make_without_airbags"))
            .withColumn("rank", dense_rank().over(window))
            .filter(col("rank") < rank_threshold + 1)
            .drop("rank")
        )
    
carmakeevaluation = CarMakeEvaluation(df_primary_person_use, df_units_use)
carmakeevaluation.filter_and_aggregate(body_style_keyword ='car', 
    person_type="DRIVER", 
    death_count=1, 
    airbag_status="NOT DEPLOYED",
    rank_threshold=5
    ).show()

+-------------+--------------------------------------+
|  VEH_MAKE_ID|Death_per_vehicle_make_without_airbags|
+-------------+--------------------------------------+
|       NISSAN|                                     4|
|    CHEVROLET|                                     3|
|         FORD|                                     2|
|        HONDA|                                     2|
|     CADILLAC|                                     1|
|      PONTIAC|                                     1|
|MERCEDES-BENZ|                                     1|
|        BUICK|                                     1|
|          KIA|                                     1|
|     CHRYSLER|                                     1|
+-------------+--------------------------------------+



4.	Analysis 4: Determine number of Vehicles with driver having valid licences involved in hit and run? 

In [6]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col

class DriverHitAndRunAnalysis:
    def __init__(self, primary_person_df: DataFrame, units_df: DataFrame):
        self.primary_person_df = primary_person_df
        self.units_df = units_df

    def filter_and_count(self, person_type: str, veh_honor_flag: str, valid_lic_types: list, invalid_lic_classes: list) -> int:
        return (
            self.primary_person_df
            .join(self.units_df, ["CRASH_ID", "UNIT_NBR"], "inner")
            .filter(
                (self.primary_person_df.PRSN_TYPE_ID == person_type) &
                (self.units_df.VEH_HNR_FL == veh_honor_flag) &
                (
                    self.primary_person_df.DRVR_LIC_TYPE_ID.isin(valid_lic_types) &
                    (~self.primary_person_df.DRVR_LIC_CLS_ID.isin(invalid_lic_classes))
                )
            )
            .select("CRASH_ID", "UNIT_NBR")
            .distinct()
            .count()
        )


driverhitandrun = DriverHitAndRunAnalysis(df_primary_person_use, df_units_use)
driverhitandrun.filter_and_count(
    person_type="DRIVER",
    veh_honor_flag="Y",
    valid_lic_types=["DRIVER LICENSE", "COMMERCIAL DRIVER LIC", "OCCUPATIONAL"],
    invalid_lic_classes=["UNLICENSED", "NA", "UNKNOWN"]
    )


2450

5.	Analysis 5: Which state has highest number of accidents in which females are not involved? 

In [7]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import countDistinct, dense_rank, col
from pyspark.sql.window import Window

class StateWithHighestAccidents:
    def __init__(self, primary_person_df: DataFrame):
        self.primary_person_df = primary_person_df

    def filter_and_aggregate(self, gender_exclusion: str, person_type: str, group_by_column: str, rank_column: str, rank_value: int = 1) -> DataFrame:
        window = Window.orderBy(col(rank_column).desc())
        
        return (
            self.primary_person_df
            .filter((self.primary_person_df.PRSN_GNDR_ID != gender_exclusion) &
                    (self.primary_person_df.PRSN_TYPE_ID == person_type))
            .groupBy(group_by_column)
            .agg(countDistinct("CRASH_ID").alias(rank_column))
            .withColumn("rank", dense_rank().over(window))
            .filter(col("rank") == rank_value)
            .drop("rank")
        )
    
statewithhighestacc = StateWithHighestAccidents(df_primary_person_use)
df_states_having_highest_accidents_without_women = statewithhighestacc.filter_and_aggregate(
    gender_exclusion="FEMALE",
    person_type="DRIVER",
    group_by_column="DRVR_LIC_STATE_ID",
    rank_column="Non Women Driver Accident Cases",
    rank_value=1)

df_states_having_highest_accidents_without_women.show()

+-----------------+-------------------------------+
|DRVR_LIC_STATE_ID|Non Women Driver Accident Cases|
+-----------------+-------------------------------+
|            Texas|                          61022|
+-----------------+-------------------------------+



6.	Analysis 6: Which are the Top 3rd to 5th VEH_MAKE_IDs that contribute to a largest number of injuries including death

In [8]:
from pyspark.sql.functions import col, sum, dense_rank
from pyspark.sql.window import Window

class TopVehicleCategories:
    def __init__(self, df_units_use):
        self.df_units_use = df_units_use

    def get_top_reqd_categories(self, lower_range, upper_range):
        # Filter rows where DEATH_CNT is not 0 and VEH_MAKE_ID is not 'NA'
        df_filtered = self.df_units_use.filter(
            (col("DEATH_CNT") != 0) & (col("VEH_MAKE_ID") != 'NA')
        )
        
        # Aggregate data and calculate total injuries per vehicle make
        df_aggregated = df_filtered.groupBy("VEH_MAKE_ID").agg(
            sum("TOT_INJRY_CNT").alias("total_injuries_sum_per_make")
        )
        
        # Create a ranking column based on total injuries
        window_spec = Window.orderBy(col("total_injuries_sum_per_make").desc())
        df_ranked = df_aggregated.withColumn(
            "rank", dense_rank().over(window_spec)
        )
        
        # Filter based on the rank range
        df_top_cat = df_ranked.filter(
            (col("rank") >= lower_range) & (col("rank") <= upper_range)
        )
        
        return df_top_cat

# Call
topcategories = TopVehicleCategories(df_units_use)
topcategories.get_top_reqd_categories(3, 5).show()

+-----------+---------------------------+----+
|VEH_MAKE_ID|total_injuries_sum_per_make|rank|
+-----------+---------------------------+----+
|       FORD|                       12.0|   3|
|      DODGE|                       10.0|   4|
|     TOYOTA|                        9.0|   5|
+-----------+---------------------------+----+



7.	Analysis 7: For all the body styles involved in crashes, mention the top ethnic user group of each unique body style  

In [9]:
from pyspark.sql.functions import countDistinct, col, dense_rank
from pyspark.sql.window import Window

class CrashesPerEthnic:
    def __init__(self, primary_person_df, units_df):
        self.primary_person_df = primary_person_df
        self.units_df = units_df
        
    def count_crashes(self, rank):
        
        df_count_of_crashes = (
            self.primary_person_df.join(self.units_df, ["CRASH_ID", "UNIT_NBR"], "inner")
            .filter(
                (~self.units_df.VEH_BODY_STYL_ID.isin(["NA", "UNKNOWN", "NOT REPORTED", "OTHER  (EXPLAIN IN NARRATIVE)"])) &
                (~self.primary_person_df.PRSN_ETHNICITY_ID.isin(["NA", "UNKNOWN"]))
            )
            .groupBy("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID")
            .agg(countDistinct("CRASH_ID").alias("Count_of_crashes"))
            .withColumn("rank", dense_rank().over(
                Window.partitionBy("VEH_BODY_STYL_ID").orderBy(col("Count_of_crashes").desc())
                )
            )
            .filter(col("rank") == rank)
            .drop("rank", "Count_of_crashes")
        )
        
        return df_count_of_crashes
    
crashperethnic = CrashesPerEthnic(df_primary_person_use,df_units_use)
df_count_of_crashes = crashperethnic.count_crashes(1)
df_count_of_crashes.show(5)


+----------------+-----------------+
|VEH_BODY_STYL_ID|PRSN_ETHNICITY_ID|
+----------------+-----------------+
|       AMBULANCE|            WHITE|
|             BUS|            BLACK|
|  FARM EQUIPMENT|            WHITE|
|      FIRE TRUCK|            WHITE|
|      MOTORCYCLE|            WHITE|
+----------------+-----------------+
only showing top 5 rows



8.	Analysis 8: Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash (Use Driver Zip Code)

In [10]:
from pyspark.sql.functions import countDistinct, instr, lower, col, dense_rank
from pyspark.sql.window import Window

class DrunkAndDriveCases:
    def __init__(self, primary_person_df, units_df, zip_code_col="DRVR_ZIP",value = 5):
        self.primary_person_df = primary_person_df
        self.units_df = units_df
        self.zip_code_col = zip_code_col
        self.value = value

    def calculate_drunk_and_drive_cases(self):
        
        alcohol_condition = (
            (instr(lower(self.units_df.CONTRIB_FACTR_1_ID), "alcohol") > 0) |
            (instr(lower(self.units_df.CONTRIB_FACTR_P1_ID), "alcohol") > 0) |
            (instr(lower(self.units_df.CONTRIB_FACTR_2_ID), "alcohol") > 0)
        )
        
        df_drunk_and_drive_cases = (
            self.primary_person_df.join(self.units_df, ["CRASH_ID", "UNIT_NBR"], "inner")
            .filter(
                alcohol_condition &
                (self.primary_person_df[self.zip_code_col] != 'NULL') &
                (instr(lower(self.units_df.VEH_BODY_STYL_ID), 'car') > 0)
            )
            .groupBy(self.zip_code_col)
            .agg(countDistinct("CRASH_ID").alias("Count_of_crashes"))
            .withColumn("rank", dense_rank().over(
                Window.orderBy(col("Count_of_crashes").desc())
            ))
            .filter(col("rank") <= self.value)
            # .drop("rank")
        )
        
        return df_drunk_and_drive_cases
    
drunk_drive_cases = DrunkAndDriveCases(df_primary_person_use, df_units_use, zip_code_col="DRVR_ZIP",value = 5)
drunk_drive_cases.calculate_drunk_and_drive_cases().show()


+--------+----------------+----+
|DRVR_ZIP|Count_of_crashes|rank|
+--------+----------------+----+
|   75052|              27|   1|
|   75067|              26|   2|
|   76010|              26|   2|
|   78521|              24|   3|
|   78130|              21|   4|
|   78550|              20|   5|
|   78745|              20|   5|
+--------+----------------+----+



9.	Analysis 9: Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance

In [11]:
from pyspark.sql.functions import countDistinct, col

class CrashIdNoDamageProperty:
    def __init__(self, units_df, damages_df):
        self.units_df = units_df
        self.damages_df = damages_df
        
    def calculate_crashes(self):
        df_no_damage_property = (
            self.units_df.join(self.damages_df, "CRASH_ID", "inner")
            .filter(
                (
                    ((self.units_df.VEH_DMAG_SCL_1_ID).isin("DAMAGED 5", "DAMAGED 6", "DAMAGED 7 HIGHEST")) |
                    ((self.units_df.VEH_DMAG_SCL_2_ID).isin("DAMAGED 5", "DAMAGED 6", "DAMAGED 7 HIGHEST"))
                ) &
                (self.damages_df.DAMAGED_PROPERTY == "NONE") &
                (self.units_df.FIN_RESP_TYPE_ID.isin("LIABILITY INSURANCE POLICY", "PROOF OF LIABILITY INSURANCE"))
            )
            .agg(countDistinct("CRASH_ID").alias("Count_of_distinct_crashIds"))
        )
        
        return df_no_damage_property
    
crash_id_no_damage_property = CrashIdNoDamageProperty(df_units_use, df_damages)
crash_id_no_damage_property.calculate_crashes().show()

+--------------------------+
|Count_of_distinct_crashIds|
+--------------------------+
|                         8|
+--------------------------+



10.	Analysis 10: Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, used top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)

In [12]:
from pyspark.sql.functions import countDistinct, col, dense_rank, instr, lower
from pyspark.sql.window import Window

class ReqdTop5VehicleMakes:
    def __init__(self, primary_person_df, units_df, charges_df):
        self.primary_person_df = primary_person_df
        self.units_df = units_df
        self.charges_df = charges_df
    
    def top25_states(self, limit=25):

        top_states_df = (
            self.primary_person_df.filter(~self.primary_person_df.DRVR_LIC_STATE_ID.isin("Unknown", "NA", "Other"))
            .join(self.charges_df, ["CRASH_ID", "PRSN_NBR", "UNIT_NBR"], "inner")
            .groupBy("DRVR_LIC_STATE_ID")
            .agg(countDistinct(self.charges_df.CRASH_ID).alias("Count_of_crashes_having_charges"))
            .withColumn("rank", dense_rank().over(
                Window.orderBy(col("Count_of_crashes_having_charges").desc())
            ))
            .filter(col("rank") <= limit)
            .drop("rank", "Count_of_crashes_having_charges")
        )
        return [row.DRVR_LIC_STATE_ID for row in top_states_df.collect()]

    
    def top10_colors(self, limit=10):
        window = Window.orderBy(col("Count_of_crashes_having_charges").desc())
        top_colors_df = (
            self.units_df.filter(~self.units_df.VEH_COLOR_ID.isin("NA"))
            .join(self.charges_df, ["CRASH_ID", "UNIT_NBR"], "inner")
            .groupBy("VEH_COLOR_ID")
            .agg(countDistinct(self.charges_df.CRASH_ID).alias("Count_of_crashes_having_charges"))
            .withColumn("rank", dense_rank().over(window))
            .filter(col("rank") <= limit)
            .drop("rank", "Count_of_crashes_having_charges")
        )
        return [row.VEH_COLOR_ID for row in top_colors_df.collect()]
    
    def top_vehicle_makers(self, states, colors, limit=5):
        window = Window.orderBy(col("Count_of_crashes_having_charges").desc())
        top_makers_df = (
            self.primary_person_df.join(self.charges_df, ["CRASH_ID", "PRSN_NBR", "UNIT_NBR"], "inner")
            .join(self.units_df, ["CRASH_ID", "UNIT_NBR"], "inner")
            .filter(
                (
                    (self.primary_person_df.DRVR_LIC_TYPE_ID.isin("DRIVER LICENSE", "COMMERCIAL DRIVER LIC", "OCCUPATIONAL")) |
                    (~self.primary_person_df.DRVR_LIC_CLS_ID.isin("UNLICENSED", "NA", "UNKNOWN"))
                ) &
                (instr(lower(self.charges_df.CHARGE), "speed") > 0) &
                (self.units_df.VEH_COLOR_ID.isin(colors)) &
                (self.units_df.VEH_LIC_STATE_ID.isin(states))
            )
            .groupBy("VEH_MAKE_ID")
            .agg(countDistinct(self.charges_df.CRASH_ID).alias("Count_of_crashes_having_charges"))
            .withColumn("rank", dense_rank().over(window))
            .filter(col("rank") <= limit)
            .drop("rank", "Count_of_crashes_having_charges")
        )
        return top_makers_df
    
vehicle_makes = ReqdTop5VehicleMakes(df_primary_person_use, df_units_use, df_charges)
top25_states = vehicle_makes.top25_states()
top10_colors = vehicle_makes.top10_colors()

top_makers = vehicle_makes.top_vehicle_makers(top25_states, top10_colors)

print("Top 5 vehicle makers with speeding related offenses:")
top_makers.show()   


Top 5 vehicle makers with speeding related offenses:
+-----------+
|VEH_MAKE_ID|
+-----------+
+-----------+

