In [169]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc,count,row_number,col
from pyspark.sql.window import Window
spark = SparkSession.builder.master('local').appName("Spark DataFrame").getOrCreate()

In [171]:
df_person = spark.read.options(inferSchema = 'True',header = 'True', delimiter = ',').csv(r"C:\Users\2238345\OneDrive - Cognizant\Yesha\Shashank\Data\Data\Primary_Person_use.csv")
# df_person.printSchema()

# Analysis 1

In [172]:
df_person.filter(col("PRSN_GNDR_ID")=="MALE").count()

96782

# Analysis 2

In [173]:
df_person.select("PRSN_HELMET_ID").filter(~col("PRSN_HELMET_ID").isin(["NOT APPLICABLE"])).count()

911

# Analysis 3

In [175]:
df_unit = spark.read.options(inferSchema = 'True',header = 'True', delimiter = ',').csv(r"C:\Users\2238345\OneDrive - Cognizant\Yesha\Shashank\Data\Data\Units_use.csv")
#df_unit.printSchema()

In [176]:
df2 = df_unit.join(df_person, "CRASH_ID","inner").select("CRASH_ID","PRSN_GNDR_ID","VEH_LIC_STATE_ID").filter(col("PRSN_GNDR_ID")=="FEMALE").groupby("VEH_LIC_STATE_ID").count()
df2 = df2.sort(col('count').desc())
print(df2.take(1)[0])

Row(VEH_LIC_STATE_ID='TX', count=127640)


# Analysis 4

In [177]:
df4 = df_unit.withColumn("Total_Injuries_death",col("TOT_INJRY_CNT")+col("DEATH_CNT")).select("VEH_MAKE_ID","TOT_INJRY_CNT","DEATH_CNT","Total_Injuries_death").orderBy('Total_Injuries_death', ascending=False)
for i, data in enumerate(df4.collect()):
    if i>=5 and i<=15:
        print(data[0])

BLUE BIRD
NORTH AMERICAN BUS
NORTH AMERICAN BUS
LINCOLN
CHEVROLET
VAN HOOL
VAN HOOL
MAZDA
MCI (LES AUTO BUS)
MCI (LES AUTO BUS)
NISSAN


# Analysis 5

In [180]:
df2 = df_unit.join(df_person, "CRASH_ID","inner")\
    .select("CRASH_ID","VEH_BODY_STYL_ID","PRSN_ETHNICITY_ID")\
    .groupby("VEH_BODY_STYL_ID","PRSN_ETHNICITY_ID").agg(count("PRSN_ETHNICITY_ID").alias("num_crashes"))

win = Window.partitionBy("VEH_BODY_STYL_ID").orderBy(desc("num_crashes"))
df2 = df2.select("*", row_number().over(win).alias("RowNum")).filter(col("RowNum")==1).drop("RowNum")
df2.show(20, False)

+---------------------------------+-----------------+-----------+
|VEH_BODY_STYL_ID                 |PRSN_ETHNICITY_ID|num_crashes|
+---------------------------------+-----------------+-----------+
|AMBULANCE                        |WHITE            |97         |
|BUS                              |HISPANIC         |391        |
|FARM EQUIPMENT                   |WHITE            |63         |
|FIRE TRUCK                       |WHITE            |112        |
|MOTORCYCLE                       |WHITE            |848        |
|NA                               |WHITE            |5693       |
|NEV-NEIGHBORHOOD ELECTRIC VEHICLE|WHITE            |10         |
|NOT REPORTED                     |HISPANIC         |2          |
|OTHER  (EXPLAIN IN NARRATIVE)    |WHITE            |459        |
|PASSENGER CAR, 2-DOOR            |WHITE            |9877       |
|PASSENGER CAR, 4-DOOR            |WHITE            |58312      |
|PICKUP                           |WHITE            |38609      |
|POLICE CA

# Analysis 6

In [181]:
filter_string = ["ALCOHOL","DRINKING"]
df6 = df_unit.join(df_person, "CRASH_ID","inner"). \
    where(col("CONTRIB_FACTR_1_ID").rlike('|'.join(filter_string)) | col("CONTRIB_FACTR_2_ID").rlike('|'.join(filter_string))). \
    groupby("DRVR_ZIP").count().orderBy(col("count").desc()).filter(col("DRVR_ZIP") != "null").limit(5)
df6 = df6.withColumnRenamed("count","Highest_number_crashes_with_alcohols")
df6.show()

+--------+------------------------------------+
|DRVR_ZIP|Highest_number_crashes_with_alcohols|
+--------+------------------------------------+
|   78521|                                  78|
|   76010|                                  77|
|   75067|                                  70|
|   78753|                                  61|
|   78741|                                  57|
+--------+------------------------------------+



# Analysis 7

In [182]:
df_damage = spark.read.options(inferSchema = 'True',header = 'True', delimiter = ',').csv(r"C:\Users\2238345\OneDrive - Cognizant\Yesha\Shashank\Data\Data\Damages_use.csv")
# df_damage.printSchema()

df7 = df_damage.join(df_unit, on=["CRASH_ID"], how='inner'). \
    where(((df_unit.VEH_DMAG_SCL_1_ID > "DAMAGED 4")) | ((df_unit.VEH_DMAG_SCL_2_ID > "DAMAGED 4"))). \
    where(df_damage.DAMAGED_PROPERTY == "NONE"). \
    where(df_unit.FIN_RESP_TYPE_ID == "PROOF OF LIABILITY INSURANCE")
df7.show()

+--------+----------------+--------+-------------+-------------+----------+----------------+-----------------+------------+------------+-----------+--------------+--------------------+---------------+--------+-----------------+--------------------+--------------------+-----------------+--------------+------------------+-----------------+--------------+------------------+--------------------+--------------------+--------------------+------------------+-------------------+---------------+---------------------+---------------+------------------+--------------+-------------+--------------+-------------+---------+
|CRASH_ID|DAMAGED_PROPERTY|UNIT_NBR| UNIT_DESC_ID|VEH_PARKED_FL|VEH_HNR_FL|VEH_LIC_STATE_ID|              VIN|VEH_MOD_YEAR|VEH_COLOR_ID|VEH_MAKE_ID|    VEH_MOD_ID|    VEH_BODY_STYL_ID|EMER_RESPNDR_FL|OWNR_ZIP|FIN_RESP_PROOF_ID|    FIN_RESP_TYPE_ID|  VEH_DMAG_AREA_1_ID|VEH_DMAG_SCL_1_ID|FORCE_DIR_1_ID|VEH_DMAG_AREA_2_ID|VEH_DMAG_SCL_2_ID|FORCE_DIR_2_ID|VEH_INVENTORIED_FL|     VEH_T

# Analysis 8

In [183]:
df_charge = spark.read.options(inferSchema = 'True',header = 'True', delimiter = ',').csv(r"C:\Users\2238345\OneDrive - Cognizant\Yesha\Shashank\Data\Data\Charges_use.csv")
# df_charge.printSchema()

top_25_state_list_df = df_unit.groupby("VEH_LIC_STATE_ID").count().orderBy(col("count").desc()). \
    filter(col("VEH_LIC_STATE_ID").cast('int').isNull()).limit(25)
top_25_state_list = top_25_state_list_df.select("VEH_LIC_STATE_ID").rdd.flatMap(lambda x: x).collect()

top_10_used_vehicle_colors_df = df_unit.groupby("VEH_COLOR_ID").count().orderBy(col("count").desc()). \
    where(col("VEH_COLOR_ID").cast('int').isNull()). \
    where(col("VEH_COLOR_ID") != "NA").limit(10)

top_10_used_vehicle_colors = top_10_used_vehicle_colors_df.select("VEH_COLOR_ID").rdd.flatMap(lambda x: x).collect()


In [184]:
df8 = df_charge.join(df_person, 'CRASH_ID','inner'). \
    join(df_unit, 'CRASH_ID', 'inner'). \
    filter(df_charge.CHARGE.contains("SPEED")). \
    filter(df_person.DRVR_LIC_TYPE_ID.isin(["DRIVER LICENSE", "COMMERCIAL DRIVER LIC."])). \
    filter(df_unit.VEH_COLOR_ID.isin(top_10_used_vehicle_colors)). \
    filter(df_unit.VEH_LIC_STATE_ID.isin(top_25_state_list)). \
    groupby("VEH_MAKE_ID").count(). \
    orderBy(col("count").desc()).withColumnRenamed("count","Highest_number_of_offences").limit(5)
df8.show()

+-----------+--------------------------+
|VEH_MAKE_ID|Highest_number_of_offences|
+-----------+--------------------------+
|       FORD|                     19205|
|  CHEVROLET|                     16860|
|     TOYOTA|                     11822|
|      DODGE|                      7935|
|     NISSAN|                      7332|
+-----------+--------------------------+

