<a href="https://colab.research.google.com/github/sajal1302/Study-Material/blob/master/BCG_PYSPARK_CLASS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
! pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 74kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 43.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=8e5dfc50490b45070ced4d363743335af7c4c925b756140bd2311169085e5ffe
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [20]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import logging
import json
logger = logging.getLogger('BCG-Use-Case')
logger.setLevel(logging.INFO)

class BaseEntity :

  def __init__(self , entity_name):
    self.entity_name = entity_name

  @staticmethod
  def get_spark_session():
      """ The function returns the spark object """
      spark = SparkSession.builder.appName("BCG-Project").enableHiveSupport().getOrCreate()
      return spark

  @staticmethod
  def load_json(file_path):
    """ The function accepts the config json path, loads json
    and returns the dictionary of the the config"""
    with open(file_path) as f:
      config = json.load(f)
      return config

  def load_file(self ,spark ,bcg_conf):
    """ The function accepts the config for the files to be loaded
    and the entity name of the file to be loaded.
    
    It loads the file based on the config and returns the 
    dataframe of the file loaded."""
    try :
      feed_name = bcg_conf['feeds'][self.entity_name]['feed_name']
      print(feed_name)
      file_loc = bcg_conf['feeds'][self.entity_name]['feed_location']
      file_format = bcg_conf['feeds'][self.entity_name]['file_format']
      delimeter = bcg_conf['feeds'][self.entity_name]['delimeter']
      header = bcg_conf['feeds'][self.entity_name]['header']
      full_path = file_loc + feed_name

      print(feed_name , file_loc , file_format, full_path )
      df_ = spark.read.load(full_path, 
                      format= file_format, sep= delimeter, inferSchema="true",
                      header=header)
      return df_

    except:
      logger.error("Check the Config for the Entity : {} ".format(self.entity_name))
  
  def trim_columns(self, df):
    """ The function takes an input dataframe and returns 
    a cleansed dataframe with no leading and trailing spaces in the string type column.
    
    Param : Raw Spark Dataframe as df
    Return : Trimmed Cleansed Spark Dataframe """
    
    df_ = df.select([ trim(col(colm)).alias(colm.strip()) if dtype =='string' else col(colm).alias(colm.strip()) for colm,dtype in df.dtypes ])
    return df_

In [48]:
class ChargesUse(BaseEntity) :
  def __init__(self, entity_name, conf_path):
    super().__init__(entity_name)
    self.conf_path = conf_path

  def run(self):
    spark = super().get_spark_session()
    conf_json = super().load_json(self.conf_path)
    df_raw = super().load_file(spark ,conf_json )
    df_ = super().trim_columns(df_raw)
    return df_

class DamagesUse(BaseEntity) :
  def __init__(self, entity_name, conf_path):
    super().__init__(entity_name)
    self.conf_path = conf_path

  def run(self):
    spark = super().get_spark_session()
    conf_json = super().load_json(self.conf_path)
    df_raw = super().load_file(spark ,conf_json )
    df_ = super().trim_columns(df_raw)
    return df_

class EndorseUse(BaseEntity) :
  def __init__(self, entity_name, conf_path):
    super().__init__(entity_name)
    self.conf_path = conf_path

  def run(self):
    spark = super().get_spark_session()
    conf_json = super().load_json(self.conf_path)
    df_raw = super().load_file(spark ,conf_json )
    df_ = super().trim_columns(df_raw)
    return df_

class PrimaryPersonUse(BaseEntity) :
  def __init__(self, entity_name, conf_path):
    super().__init__(entity_name)
    self.conf_path = conf_path

  def run(self):
    spark = super().get_spark_session()
    conf_json = super().load_json(self.conf_path)
    df_raw = super().load_file(spark ,conf_json )
    df_ = super().trim_columns(df_raw)
    return df_

class RestrictUse(BaseEntity) :
  def __init__(self, entity_name, conf_path):
    super().__init__(entity_name)
    self.conf_path = conf_path

  def run(self):
    spark = super().get_spark_session()
    conf_json = super().load_json(self.conf_path)
    df_raw = super().load_file(spark ,conf_json )
    df_ = super().trim_columns(df_raw)
    return df_

class UnitUse(BaseEntity) :
  def __init__(self, entity_name, conf_path):
    super().__init__(entity_name)
    self.conf_path = conf_path

  def run(self):
    spark = super().get_spark_session()
    conf_json = super().load_json(self.conf_path)
    df_raw = super().load_file(spark ,conf_json )
    df_ = super().trim_columns(df_raw)
    return df_

In [49]:

charges_use_df = ChargesUse('Charges_use' , '/content/drive/MyDrive/bcg/config/config.json').run()

damages_use_df = DamagesUse('Damages_use' , '/content/drive/MyDrive/bcg/config/config.json').run()

endorse_use_df = EndorseUse('Endorse_use' , '/content/drive/MyDrive/bcg/config/config.json').run()

primary_person_use_df = PrimaryPersonUse('Primary_Person_use' , '/content/drive/MyDrive/bcg/config/config.json').run()

restrict_use_df = RestrictUse('Restrict_use' , '/content/drive/MyDrive/bcg/config/config.json').run()

units_use_df = UnitUse('Units_use' , '/content/drive/MyDrive/bcg/config/config.json').run()

Charges_use.csv
Charges_use.csv /content/drive/MyDrive/bcg/staging_files/ csv /content/drive/MyDrive/bcg/staging_files/Charges_use.csv
Damages_use.csv
Damages_use.csv /content/drive/MyDrive/bcg/staging_files/ csv /content/drive/MyDrive/bcg/staging_files/Damages_use.csv
Endorse_use.csv
Endorse_use.csv /content/drive/MyDrive/bcg/staging_files/ csv /content/drive/MyDrive/bcg/staging_files/Endorse_use.csv
Primary_Person_use.csv
Primary_Person_use.csv /content/drive/MyDrive/bcg/staging_files/ csv /content/drive/MyDrive/bcg/staging_files/Primary_Person_use.csv
Restrict_use.csv
Restrict_use.csv /content/drive/MyDrive/bcg/staging_files/ csv /content/drive/MyDrive/bcg/staging_files/Restrict_use.csv
Units_use.csv
Units_use.csv /content/drive/MyDrive/bcg/staging_files/ csv /content/drive/MyDrive/bcg/staging_files/Units_use.csv


In [50]:
charges_use_df.show(10, False)

damages_use_df.show(10, False)

endorse_use_df.show(10, False)

primary_person_use_df.show(10, False)

restrict_use_df.show(10, False)

units_use_df.show(10, False)

+--------+--------+--------+----------------------------------------+------------+
|CRASH_ID|UNIT_NBR|PRSN_NBR|CHARGE                                  |CITATION_NBR|
+--------+--------+--------+----------------------------------------+------------+
|14768622|1       |1       |DRIVING WHILE INTOXICATED               |null        |
|14838637|1       |1       |DWI                                     |1600000015  |
|14838641|1       |1       |RAN RED LIGHT SOLID (TC 544.007)        |L20440      |
|14838641|2       |1       |NO DRIVER'S LICENSE (TC521.025)         |L23141      |
|14838668|1       |1       |DRIVING WHILE INTOXICATED               |TX4IC50SRJD3|
|14838669|2       |1       |DWI W/BAC >.015                         |2015-000006 |
|14838670|1       |1       |DRIVING WHILE INTOXICATED               |2016-000003 |
|14838685|1       |1       |FAILED TO DRIVE SINGLE LANE, NOL, NO INS|138434825   |
|14838693|1       |1       |DRIVING WHILE INTOXICATED               |TX4IC60UKQND|
|148

In [45]:
endorse_use_df.show(10, False)

+--------+---------------------------------+
|CRASH_ID|DAMAGED_PROPERTY                 |
+--------+---------------------------------+
|14768622|MAILBOX                          |
|14768622|YARD, GRASS                      |
|14838668|GUARDRAIL                        |
|14838685|ROAD SIGN                        |
|14838693|2009 MAZDA 3                     |
|14838834|CHAIN LINK FENCE                 |
|14838841|WOODED POLE ON SOUTH SIDE OF LOOP|
|14838842|CITY SIGN FOR TURN LANES         |
|14838877|FENCE-CHAIN LINK                 |
|14838977|LANDSCAPING AND METAL SIGN       |
+--------+---------------------------------+
only showing top 10 rows



In [56]:
#publish_utils_analysis.py

def analysis1(df, show=True , write= False):
  """ Find the number of crashes (accidents) in which number of persons killed are male """
  df_analysis = df.where((lower(col('prsn_injry_sev_id')) == 'killed') & (lower(col('prsn_gndr_id')) == 'male'))\
                .select(countDistinct(col('crash_id')).alias("cnt_male_killed"))
  if(show == True):
    df_analysis.show(1000, False)

  if(write == True) :
    df_analysis.coalesce(1).write.csv('/content/drive/MyDrive/bcg/output/analysis1.csv')
  
def analysis2(df, show=True , write= False):
  """ Two wheelers are booked for crashes. """
  df_analysis = df.where(upper(col('veh_body_styl_id')) == 'MOTORCYCLE').select(countDistinct('crash_id').alias('two_whlr_booked'))

  if(show == True):
    df_analysis.show(1000, False)

  if(write == True) :
    df_analysis.coalesce(1).write.csv('/content/drive/MyDrive/bcg/output/analysis2.csv')


def analysis3(df, show=True , write= False):

  fem_crash_state_df = df.where((lower(col('prsn_gndr_id')) == 'female') 
                    & (~ lower(col('drvr_lic_state_id')).isin('na', 'unknown')))\
                      .groupBy(lower(col('drvr_lic_state_id')).alias('drvr_lic_state_id'))\
                        .agg(countDistinct(col('crash_id')).alias("num_crashes")).orderBy(col("num_crashes").desc())

  window_crash_rank = Window.orderBy(col('num_crashes').desc())

  df_analysis =fem_crash_state_df.withColumn("rank", rank().over(window_crash_rank))\
  .where("rank = 1").select(col('drvr_lic_state_id').alias("max_female_crash_state"))

  if(show == True):
    df_analysis.coalesce(1).show(1000, False)

  if(write == True) :
    df_analysis.coalesce(1).write.csv('/content/drive/MyDrive/bcg/output/analysis3.csv')

def analysis4(df, show=True , write= False):

  win_spec_veh_inj_cnt = Window.orderBy(col("veh_inj_cnt").desc())
  df_analysis = df.where(lower(col('veh_make_id')) != 'na').groupBy("veh_make_id").agg(sum(col('tot_injry_cnt')).alias("veh_inj_cnt"))\
        .withColumn("rank", dense_rank().over(win_spec_veh_inj_cnt)).select(col('veh_make_id'), col('rank')).where("rank >=5 and rank <= 15")

  if(show == True):
    df_analysis.show(1000, False)

  if(write == True) :
    df_analysis.coalesce(1).coalesce(1).write.csv('/content/drive/MyDrive/bcg/output/analysis4.csv')

def analysis5(df1, df2 ,show=True , write= False):
  #prsn_ethnicity_id shouldnot be na, unknown etc
  primary_person_use_fil_df = df1.where(~upper(col('prsn_ethnicity_id')).isin('NA','UNKNOWN'))
  #win_spec_veh_sty_eth = Window.partitionBy(col('veh_body_styl_id')).orderBy()
  comp_df = df2.where(~lower(col('veh_body_styl_id')).isin('na', 'not reported' ,'unknown'))\
        .join(primary_person_use_fil_df, "crash_id", "inner")\
        .select(col('crash_id'),
                upper(col('veh_body_styl_id')).alias('veh_body_styl_id'),
                upper(col('prsn_ethnicity_id')).alias('prsn_ethnicity_id'))

  agg_comp_df = comp_df.groupBy(col('veh_body_styl_id'), col('prsn_ethnicity_id')).agg(countDistinct(col('crash_id')).alias('cnt_dis_crash_id')).orderBy(col('veh_body_styl_id'))


  win_spec_veh_sty_eth = Window.partitionBy(col('veh_body_styl_id')).orderBy(col('cnt_dis_crash_id').desc())

  df_analysis = agg_comp_df.withColumn("rank", dense_rank().over(win_spec_veh_sty_eth))\
        .where(col('rank') == 1)\
        .select(col('veh_body_styl_id'), col('prsn_ethnicity_id').alias('top_prsn_ethnicity_id'))

  if(show == True):
    df_analysis.show(1000, False)

  if(write == True) :
    df_analysis.coalesce(1).coalesce(1).write.csv('/content/drive/MyDrive/bcg/output/analysis5.csv')

def analysis6(df1, df2 , show=True , write= False):
  cntns_car = instr(upper(col('veh_body_styl_id')) , 'CAR') >=1
  units_cntns_car_df = df1.withColumn('cntns_car_flag', cntns_car)\
                      .where(col('cntns_car_flag')).select(col('crash_id')).distinct()

  win_spec_al_zip = Window.orderBy(col('num_tot_crashes').desc())
  alc_prsn_df = primary_person_use_df.join(units_cntns_car_df, "crash_id", "inner").where((upper(col('prsn_alc_rslt_id')) == 'POSITIVE') & (col('drvr_zip').isNotNull()))\
                .groupBy(col('drvr_zip')).agg(countDistinct('crash_id').alias('num_tot_crashes'))\
                .withColumn("rank", rank().over(win_spec_al_zip))

  df_analysis = alc_prsn_df.where(col('rank')<=5)\
                        .select(col('drvr_zip'), col('rank'))

  if(show == True):
    df_analysis.coalesce(1).show(1000, False)

  if(write == True) :
    df_analysis.coalesce(1).write.csv('/content/drive/MyDrive/bcg/output/analysis6.csv')

def analysis7(df1, df2, show=True , write= False):
  cntns_ins = instr(col('fin_resp_type_id') , 'INSURANCE') >=1

  units_use_ins = df1\
          .withColumn('veh_dmg_lvl_1',regexp_extract(col('veh_dmag_scl_1_id'), r'(\d+)', 1).cast('bigint'))\
          .withColumn('veh_dmg_lvl_2',regexp_extract(col('veh_dmag_scl_2_id'), r'(\d+)', 1).cast('bigint'))\
          .withColumn('type_id_ins_flg', cntns_ins)\
          .select(col('crash_id'),col('veh_dmg_lvl_1'), col('veh_dmg_lvl_2'), col('type_id_ins_flg'))\
          .where((~col('fin_resp_proof_id').isin('NA','NR')) & ((col('veh_dmg_lvl_1') >4) | (col('veh_dmg_lvl_2') >4) ) & (col('type_id_ins_flg')))\
          
  no_dmg_df = units_use_ins.join(df2, "crash_id", "leftanti")

  df_analysis = no_dmg_df.select(countDistinct(col('crash_id')).alias('cnt_no_dmg'))

  if(show == True):
    df_analysis.show(1000, False)

  if(write == True) :
    df_analysis.coalesce(1).write.csv('/content/drive/MyDrive/bcg/output/analysis7.csv')

"""def analysis8(df, show=True , write= False):
  df_analysis = df.where((lower(col('prsn_injry_sev_id')) == 'killed') & (lower(col('prsn_gndr_id')) == 'male'))\
                .select(countDistinct(col('crash_id')))
  if(show == True):
    df_analysis.show(1000, False)

  if(write == True) :
    df_analysis.coalesce(1).write.csv('/content/drive/MyDrive/bcg/output/analysis8.csv')
"""
def main():

  charges_use_df = ChargesUse('Charges_use' , '/content/drive/MyDrive/bcg/config/config.json').run()

  damages_use_df = DamagesUse('Damages_use' , '/content/drive/MyDrive/bcg/config/config.json').run()

  endorse_use_df = EndorseUse('Endorse_use' , '/content/drive/MyDrive/bcg/config/config.json').run()

  primary_person_use_df = PrimaryPersonUse('Primary_Person_use' , '/content/drive/MyDrive/bcg/config/config.json').run()

  restrict_use_df = RestrictUse('Restrict_use' , '/content/drive/MyDrive/bcg/config/config.json').run()

  units_use_df = UnitUse('Units_use' , '/content/drive/MyDrive/bcg/config/config.json').run()


In [None]:
analysis1(primary_person_use_df)

analysis2(units_use_df)

analysis3(units_use_df)

analysis4(units_use_df)

analysis5(primary_person_use_df , units_use_df )

analysis6(units_use_df, primary_person_use_df)

analysis7(units_use_df , damages_use_df)

analysis8(units_use_df)

In [64]:
#analysis1(primary_person_use_df)

#analysis2(units_use_df)

#analysis3(primary_person_use_df)

#analysis4(units_use_df)

#analysis5(primary_person_use_df , units_use_df )

#analysis6(units_use_df, primary_person_use_df)

analysis7(units_use_df , damages_use_df)

#analysis8(units_use_df)

+----------+
|cnt_no_dmg|
+----------+
|8849      |
+----------+

