In [21]:
%run ./UTILS/modules.ipynb

In [22]:
class ToGold():
    #data cleaning functions
    def __init__(self):
        self.config = read_yaml()

        self.primary_person = read_parquet(self.config['SILVER_PATHS']['PRIMARY_PERSON_USE_PATH'])
        self.units = read_parquet(self.config['SILVER_PATHS']['UNITS_USE_PATH'])
        self.charges = read_parquet(self.config['SILVER_PATHS']['CHARGES_USE_PATH'])
        self.endorse = read_parquet(self.config['SILVER_PATHS']['ENDORSE_USE_PATH'])
        self.damages = read_parquet(self.config['SILVER_PATHS']['DAMAGES_USE_PATH'])
        self.restrict = read_parquet(self.config['SILVER_PATHS']['RESTRICT_USE_PATH'])
       
   
    def accCountMalesKilled(self):
        """
            Saves the output dataframe at the GOLD directory.
            Parameter: -
            Returns:   DataFrame of CRASH_IDs where male death count > 2
        """
        
        primary_person = self.primary_person.filter((col('PRSN_GNDR_ID')=='MALE')&(col('PRSN_INJRY_SEV_ID')=='KILLED'))\
                            .groupby(col('CRASH_ID')).sum('DEATH_CNT')    
        result = primary_person.filter(col('sum(DEATH_CNT)')>2).select('CRASH_ID')
        try:
            path = self.config['GOLD_PATH']
            write_df_to_csv(result,path+'01_male_klld_acc')
        except(e):
            pass
        return result

    
    def twoWheelerCrashes(self):
        """
            Saves the output dataframe at the GOLD directory.
            Parameter: -
            Returns:   DataFrame of distinct MOTORCYCLE Vehicle IDs 
                        that were charged during a crash
        """
        units = self.units.where(col('VEH_BODY_STYL_ID').contains('MOTORCYCLE'))\
                    .select('CRASH_ID','VIN').distinct()
        
        charges = self.charges.select('CRASH_ID').distinct()
        result = units.join(charges,on='CRASH_ID',how='inner').select('VIN').distinct()
        
        path = self.config['GOLD_PATH']
        write_df_to_csv(result,path+'02_two_whlr_chrgd')
        
        return result


    def carMakersAirBags(self):
        """
            Saves the output dataframe at the GOLD directory.
            Parameter: -
            Returns:   Returns dataframe with top 5 Car Makers
                        when during the airbags did not deploy 
                        during the accident.
        """
        primary_person = self.primary_person.filter((col('PRSN_INJRY_SEV_ID')=='KILLED')
                                                    &(col('PRSN_AIRBAG_ID')=='NOT DEPLOYED'))\
                                            .select('CRASH_ID').distinct()
        
        units = self.units.filter((~col("VEH_MAKE_ID").isin(["NA","UNKNOWN"])))\
                          .select('CRASH_ID','VEH_MAKE_ID').distinct()

        result = primary_person.join(units, on ='CRASH_ID',how='inner')\
                                .select(units['*'])\
                                .groupby('VEH_MAKE_ID').count()\
                                .orderBy(col('count').desc()).limit(5)
        path = self.config['GOLD_PATH']
        write_df_to_csv(result,path+'03_car_arbgs_dth')
        return result

    def hitAndRun(self):
        """
            Saves the output dataframe at the GOLD directory.
            Parameter: -
            Returns:   Returns dataframe with car VIN
                        which were booked for hit & run, having valid licensed.
        """
        units = self.units.filter(col('VEH_HNR_FL')=='Y').select('CRASH_ID','VIN').distinct()
        primary_person = self.primary_person.filter((col('DRVR_LIC_TYPE_ID')\
                                                     .isin(['DRIVER LICENSE'
                                                            ,'COMMERCIAL DRIVER LIC.'
                                                            ,'OCCUPATIONAL'])))\
                                            .select('CRASH_ID').distinct()
        result = units.join(primary_person,on='CRASH_ID',how='inner').select('VIN').distinct()
        
        path = self.config['GOLD_PATH']
        write_df_to_csv(result,path+'04_car_vin_hnr')
 
        return result

    def stateWithNoFemaleCrashes(self):
        """
            Saves the output dataframe at the GOLD directory.
            Parameter: -
            Returns:   Returns dataframe with car VIN
                        which were booked for hit & run, having valid license.
        """
        primary_person = self.primary_person.filter((col('PRSN_GNDR_ID')!='FEMALE'))\
                                            .select('DRVR_LIC_STATE_ID','CRASH_ID')\
                                            .distinct()
        result = primary_person.groupby('DRVR_LIC_STATE_ID').count().orderBy(col('count').desc()).limit(1)
        path = self.config['GOLD_PATH']
        write_df_to_csv(result,path+'05_no_female_acc_state')
 
        return result

    def top3To5VehMakers(self):
        """
            Returns DF Top 3rd to 5th VEH_MAKE_IDs that contribute 
                    to a largest number of injuries including death
        """
        units = self.units.withColumn('TOTAL_INJ',col('TOT_INJRY_CNT')+col('DEATH_CNT'))\
                          .groupby('VEH_MAKE_ID').sum('TOTAL_INJ')\
                          .withColumnRenamed("sum(TOTAL_INJ)","TOTAL_INJ")\
                          .orderBy(col('sum(TOTAL_INJ)').desc())
        result = units.limit(5).subtract(units.limit(2))
        path = self.config['GOLD_PATH']
        write_df_to_csv(result,path+'06_car_mkrs_3to5')
        return result

    def crashesOnVehAndEth(self):
        """
            Returns DF: For all the body styles involved in crashes, 
                        the top ethnic user group of each unique body style  
        """
        units = self.units.filter(~col('VEH_BODY_STYL_ID').isin("UNKNOWN","NA"))\
                    .select('CRASH_ID','VEH_BODY_STYL_ID').distinct()
        
        primary_person = self.primary_person.filter(~col('PRSN_ETHNICITY_ID').isin("UNKNOWN","NA"))\
                    .select('CRASH_ID','PRSN_ETHNICITY_ID').distinct()
        
        result = units.join(primary_person, on = 'CRASH_ID',how='inner')\
                    .groupby('VEH_BODY_STYL_ID','PRSN_ETHNICITY_ID').count()
        
        windowSpec  = Window.partitionBy("VEH_BODY_STYL_ID").orderBy(col("count").desc())
        result = result.withColumn("rank",row_number().over(windowSpec))\
                .withColumnRenamed("count","NBR_ACC").filter(col("rank")=='1')\
                .select('VEH_BODY_STYL_ID','PRSN_ETHNICITY_ID','NBR_ACC')
        path = self.config['GOLD_PATH']
        write_df_to_csv(result,path+'07_car_styl_acc_eth')
        return result

    def zipCodes_AlcInfluence_Crashes(self):    
        """
            Returns: top 5 ZIP codes where accidents occured under 
                        the influence of alcohol.
        """
        primary_person = self.primary_person.filter(((col('PRSN_ALC_RSLT_ID')=='Positive') 
                                                     #& (col('PRSN_TYPE_ID').contains('DRIVER'))\
                                                     & (~col('DRVR_ZIP').isin('NA','UNKNOWN'))))\
                .select('DRVR_ZIP','CRASH_ID').drop_duplicates()
        
        units = self.units.filter(col('CONTRIB_FACTR_1_ID').contains('ALCOHOL')\
                                 |col('CONTRIB_FACTR_2_ID').contains('ALCOHOL')\
                                 |col('CONTRIB_FACTR_P1_ID').contains('ALCOHOL'))\
                .select('CRASH_ID').drop_duplicates()
     
        result = primary_person.join(units,on='CRASH_ID',how='full').dropna().drop_duplicates()
   
        result = result.groupby('DRVR_ZIP').count().orderBy(col('count').desc()).limit(5)
        
        path = self.config['GOLD_PATH']
        write_df_to_csv(result,path+'08_alc_acc_per_zip')
        return result


    def insAvailCrashes(self):
        """
            Returns: CRASH_IDs where insurance was availed
                     and damage grade > 4 or no damage done.
        """
        result = self.units.filter(col('FIN_RESP_TYPE_ID').isin(['PROOF OF LIABILITY INSURANCE'
                                                                ,'LIABILITY INSURANCE POLICY'
                                                                ,'CERTIFICATE OF SELF-INSURANCE'
                                                                ,'INSURANCE BINDER'])
                                 & ((col('VEH_DMAG_SCL_1_ID') > 'DAMAGED 4')  
                                   |(col('VEH_DMAG_SCL_1_ID') == 'NO DAMAGED')
                                   |(col('VEH_DMAG_SCL_2_ID') == 'NO DAMAGED')
                                   |(col('VEH_DMAG_SCL_2_ID') > 'DAMAGED 4')))\
                .select('CRASH_ID').distinct()
        
        path = self.config['GOLD_PATH']
        write_df_to_csv(result,path+'09_ins_whn_dmag')
        return result

    def veh_mkrs_spding(self):
        """Determine the Top 5 Vehicle Makes where drivers are 
            charged with speeding related offences, 
            has licensed Drivers, used top 10 used vehicle colours 
            and has car licensed with the Top 25 states 
            with highest number of offences (to be deduced from the data)
        """
        
        primary_person = self.primary_person.filter((col('PRSN_TYPE_ID').contains('DRIVER'))
                                                    &(col('DRVR_LIC_TYPE_ID')\
                                                     .isin(['DRIVER LICENSE'
                                                            ,'COMMERCIAL DRIVER LIC.'
                                                            ,'OCCUPATIONAL'])))\
                                            .select('CRASH_ID','DRVR_LIC_STATE_ID').distinct()\
                                            .filter(~col('DRVR_LIC_STATE_ID').isin(['NA','UNKNOWN']))\
                                            .groupby('DRVR_LIC_STATE_ID').count()\
                                            .orderBy(col('count').desc()).limit(25).select('DRVR_LIC_STATE_ID')
        top25states = [i[0] for i in primary_person.collect()]
        
        crashes_in_25_states = self.primary_person.filter(col('DRVR_LIC_STATE_ID').isin(top25states))\
                                                  .select('CRASH_ID').drop_duplicates()
        
        units = self.units.select('VEH_COLOR_ID','CRASH_ID').distinct()\
                         .filter(~col('VEH_COLOR_ID').isin(['NA','UNKNOWN']))\
                         .groupby('VEH_COLOR_ID').count()\
                         .orderBy(col('count').desc()).limit(10).select('VEH_COLOR_ID')
        
        top10clrs = [i[0] for i in units.collect()]
        
        crashes_in_10_colors = self.units.filter(col('VEH_COLOR_ID').isin(top25states))\
                                                  .select('CRASH_ID').drop_duplicates()
        
        charges = self.charges.filter((col('CHARGE').contains('SPEED'))).select('CRASH_ID').distinct()
        crashes = crashes_in_25_states.union(crashes_in_10_colors).union(charges)
        
        result = self.units.join(crashes,on='CRASH_ID',how='inner').distinct()\
            .groupby('VEH_MAKE_ID').count()\
            .orderBy(col('count').desc()).limit(5)
        
        path = self.config['GOLD_PATH']
        write_df_to_csv(result,path+'10_veh_mkrs_speeding')
        return result                             

In [23]:
if __name__ =="__main__":
    try:
        obj = ToGold()
        obj.accCountMalesKilled()
        obj.twoWheelerCrashes()
        obj.carMakersAirBags()
        obj.hitAndRun()
        obj.stateWithNoFemaleCrashes()
        obj.top3To5VehMakers()
        obj.crashesOnVehAndEth()
        obj.zipCodes_AlcInfluence_Crashes()
        obj.insAvailCrashes()
        obj.veh_mkrs_spding()
        logging.basicConfig(level=logging.INFO)
    except Exception as e:
        logging.error(f"An error occurred: {str(e)}")