### Importing required packages

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from functools import reduce
from pyspark.sql.types import IntegerType

### Creating Session object

In [2]:
spark = SparkSession.builder \
    .appName("FARS processing") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrationRequired", "false")\
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/17 19:39:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Merging files

In [3]:
def get_schema(file_name):
    
    df1 = spark.read\
            .option('delimiter', ',')\
            .option('header', 'true')\
            .option('inferSchema', 'true')\
            .csv(f"/FARS/Unclean_data/2020/{file_name}.CSV")
    
    expected_schema = df1.schema.names
    return expected_schema

In [4]:
def merge_files_for_year(year, file_name):

    file_path = "/FARS/Unclean_data/" + str(year) + "/" + file_name + ".CSV"
    
    # load the dataframe for the file
    df = spark.read\
        .option('delimiter', ',')\
        .option('header', 'true')\
        .option('inferSchema', 'true')\
        .csv(file_path)

    expected_schema = get_schema(file_name)
    
    # select the columns with string data type and convert them to integer
    str_cols = [col_name for col_name, col_type in df.dtypes if col_type == 'string']
    for col_name in str_cols:
        df = df.withColumn(col_name, df[col_name].cast('integer'))
    
    missing_cols = set(expected_schema) - set(df.columns)
    for col in missing_cols:
        df = df.withColumn(col, lit(None).cast(IntegerType()))
         
    # select only the columns in the expected schema, and in the correct order
    df = df.select(expected_schema)
    return df



In [5]:
# creating a list of dataframes for each year
ACC = [merge_files_for_year(year, "ACC_AUX") for year in range(1982, 2021)]
PER = [merge_files_for_year(year, 'PER_AUX') for year in range(1982, 2021)]
VEH = [merge_files_for_year(year, 'VEH_AUX') for year in range(1982, 2021)]

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [6]:
def check_schema(dfs):
    for df2 in dfs:
        if dfs[0].schema == df2.schema:
            pass
        else:
            print("Schema for year not matched: "+ str(df2.select('YEAR').first()[0]))
    else:
        print("Congrats!. All schemas are matched.")

In [7]:
check_schema(ACC)
check_schema(PER)
check_schema(VEH)

Congrats!. All schemas are matched.
Congrats!. All schemas are matched.
Congrats!. All schemas are matched.


In [8]:
def merge_dfs(dfs):
    df = dfs[0].union(dfs[1])

    for df2 in dfs[2:]:
        df = df.union(df2)
    return df

In [9]:
m_ACC = merge_dfs(ACC)
m_PER = merge_dfs(PER)
m_VEH = merge_dfs(VEH)

In [10]:
def shape(df):
    num_rows = df.count()
    num_cols = len(df.columns)
    print("Number of rows: ", num_rows)
    print("Number of columns: ", num_cols)

In [11]:
shape(m_ACC)

Number of rows:  1418963
Number of columns:  42




In [12]:
shape(m_PER)

Number of rows:  3701950
Number of columns:  24




In [13]:
shape(m_VEH)

Number of rows:  2133759
Number of columns:  18


### Selecting required columns and changing column names

In [14]:
print(get_schema("ACC_AUX"))

['ST_CASE', 'YEAR', 'STATE', 'COUNTY', 'FATALS', 'A_CRAINJ', 'A_REGION', 'A_RU', 'A_INTER', 'A_RELRD', 'A_INTSEC', 'A_ROADFC', 'A_JUNC', 'A_MANCOL', 'A_TOD', 'A_DOW', 'A_CT', 'A_WEATHER', 'A_LT', 'A_MC', 'A_SPCRA', 'A_PED', 'A_PED_F', 'A_PEDAL', 'A_PEDAL_F', 'A_ROLL', 'A_POLPUR', 'A_POSBAC', 'A_D15_19', 'A_D16_19', 'A_D15_20', 'A_D16_20', 'A_D65PLS', 'A_D21_24', 'A_D16_24', 'A_RD', 'A_HR', 'A_DIST', 'A_DROWSY', 'BIA', 'SPJ_INDIAN', 'INDIAN_RES']


In [15]:
ACC_COLS = [('ST_CASE', 'ST_CASE'), ('YEAR', 'YEAR'), ('STATE', 'State'), ('A_CRAINJ', 'Crash_injury_type'), 
            ('A_CT', 'Crash_type'), ('A_TOD', 'Time_of_day'), ('A_DIST', 'Distracted_driver'), 
            ('A_DOW', 'Day_of_week'), ('A_DROWSY', 'Drowsy_driver'), ('A_HR', 'Hit_run'), 
            ('A_INTSEC', 'Intersection'), ('A_JUNC', 'Junction'), ('A_LT', 'Involve_large_truck'), 
            ('A_MANCOL', 'Type_of_collision'), ('A_MC', 'Involve_motorcycle'), ('A_PED', 'Involve_pedestrain'), 
            ('A_POSBAC', 'Positive_blood_alcohol'), ('A_RD', 'Roadway_departure'), 
            ('A_ROADFC', 'Roadway_func_class'), ('A_ROLL', 'Crash_with_rollover'), ('A_SPCRA', 'Involve_speeding')]

In [16]:
print(get_schema("VEH_AUX"))

['YEAR', 'ST_CASE', 'VEH_NO', 'A_DRDIS', 'A_DRDRO', 'A_VRD', 'A_BODY', 'A_IMP1', 'A_IMP2', 'A_VROLL', 'A_LIC_S', 'A_LIC_C', 'A_CDL_S', 'A_MC_L_S', 'A_SPVEH', 'A_SBUS', 'A_MOD_YR', 'A_FIRE_EXP']


In [17]:
VEH_COLS = [('ST_CASE', 'ST_CASE'), ('YEAR', 'YEAR'), ('A_BODY', 'Veh_body_type'), 
            ('A_DRDIS', 'Distracted_driver'), ('A_DRDRO', 'Drowsy_driver'), ('A_IMP1', 'Ini_impact_point'), 
            ('A_LIC_S', 'License_status'), ('A_SPVEH', 'Speeding_involved'), ('A_SBUS', 'School_bus'), 
            ('A_VROLL', 'Rollover')]

In [18]:
print(get_schema("PER_AUX"))

['A_AGE1', 'A_AGE2', 'A_AGE3', 'A_AGE4', 'A_AGE5', 'A_AGE6', 'A_AGE7', 'A_AGE8', 'A_AGE9', 'ST_CASE', 'VEH_NO', 'PER_NO', 'YEAR', 'A_PTYPE', 'A_RESTUSE', 'A_HELMUSE', 'A_ALCTES', 'A_HISP', 'A_RCAT', 'A_HRACE', 'A_EJECT', 'A_PERINJ', 'A_LOC', 'A_DOA']


In [19]:
PER_COLS = [('ST_CASE', 'ST_CASE'), ('YEAR', 'YEAR'), ('A_AGE4', 'Age_group'), ('A_ALCTES', 'Alcohol_test'), 
        ('A_EJECT', 'Ejected'), ('A_HELMUSE', 'Helmet_use'), ('A_PERINJ', 'Person_injury_type'), 
        ('A_PTYPE', 'Person_type')]

### Changing and selecting required columns

In [20]:
df_acc = m_ACC.select([col(name[0]).alias(name[1]) for name in ACC_COLS])
df_veh = m_VEH.select([col(name[0]).alias(name[1]) for name in VEH_COLS])
df_per = m_PER.select([col(name[0]).alias(name[1]) for name in PER_COLS])

In [21]:
shape(df_acc)

Number of rows:  1418963
Number of columns:  21


In [22]:
shape(df_veh)

Number of rows:  2133759
Number of columns:  10


In [23]:
shape(df_per)

Number of rows:  3701950
Number of columns:  8


## Decoding data

In [24]:
decode_dict1 = {}

for name in df_acc.columns[3:]:
    values1 = df_acc.select(name).distinct().rdd.map(lambda x: x[0]).collect()

    decode_dict1[name] = values1

                                                                                

In [25]:
decode_dict1

{'Crash_injury_type': [1],
 'Crash_type': [1, 3, 2],
 'Time_of_day': [1, 3, 2],
 'Distracted_driver': [1, 2],
 'Day_of_week': [1, 3, 2],
 'Drowsy_driver': [1, 2],
 'Hit_run': [1, 2],
 'Intersection': [1, 3, 2],
 'Junction': [1, 3, 4, 2],
 'Involve_large_truck': [1, 2],
 'Type_of_collision': [1, 6, 3, 5, 4, 7, 2],
 'Involve_motorcycle': [1, 2],
 'Involve_pedestrain': [1, 2],
 'Positive_blood_alcohol': [1, 3, 2],
 'Roadway_departure': [3, 1, 2],
 'Roadway_func_class': [1, 6, 3, 5, 4, 7, 2],
 'Crash_with_rollover': [1, 2],
 'Involve_speeding': [1, 2]}

In [26]:
decode_dict1 = {
#     "STATE": {1: 'Alabama', 2: 'Alaska', 4: 'Arizona', 5: 'Arkansas', 6: 'California', 8: 'Colorado', 9: 'Connecticut',
#               10: 'Delaware', 11: 'District of Columbia', 12: 'Florida', 13: 'Georgia', 15: 'Hawaii', 16: 'Idaho',
#               17: 'Illinois', 18: 'Indiana', 19: 'Iowa', 20: 'Kansas', 21: 'Kentucky', 22: 'Louisiana', 23: 'Maine',
#               24: 'Maryland', 25: 'Massachusetts', 26: 'Michigan', 27: 'Minnesota', 28: 'Mississippi', 29: 'Missouri',
#               30: 'Montana', 31: 'Nebraska', 32: 'Nevada', 33: 'New Hampshire', 34: 'New Jersey', 35: 'New Mexico',
#               36: 'New York', 37: 'North Carolina', 38: 'North Dakota', 39: 'Ohio', 40: 'Oklahoma', 41: 'Oregon',
#               42: 'Pennsylvania', 43: 'Puerto Rico', 44: 'Rhode Island', 45: 'South Carolina', 46: 'South Dakota',
#               47: 'Tennessee', 48: 'Texas', 49: 'Utah', 50: 'Vermont', 51: 'Virginia', 52: 'Virgin Islands',
#               53: 'Washington', 54: 'West Virginia', 55: 'Wisconsin', 56: 'Wyoming' },

    "Crash_injury_type" : {1: "Fatal"},
    "Crash_type": {1: "Single_veh", 2: "Two_veh", 3: "More_than_2"},
    "Time_of_day": {1: "Day", 2: "Night", 3:"Un"},
    "Distracted_driver": {1: "Y", 2: "N"},
    "Day_of_week" : {1: "W_day", 2: "W_end", 3:"Un"},
    "Drowsy_driver": {1: "Y", 2:"N"},
    "Hit_run": {1: "Y", 2:"N"},
    "Intersection": {1: "Y", 2:"N", 3:"Un"},
    "Junction": {1: "Junc", 2: "Non-junc", 3:"Oth", 4:"Un"},
    "Involve_large_truck": {1: "Y", 2:"N"},
    
#     "Type_of_collision": { 1: 'Non-Collision MVIT',
#                            2: 'Rear-End', 3: 'Head-On', 4: 'Angle', 5: 'Sideswipe',
#                            6: 'Oth', 7: 'Un'},
    "Involve_motorcycle": {1: "Y", 2:"N"},
    "Involve_pedestrain": {1: "Y", 2:"N"},
    "Positive_blood_alcohol": {1:"Y", 2:"N-D", 3:"Un"},
    "Roadway_departure": {1:"Y", 2:"N", 3:"N-D"},
#     "Roadway_func_class" : {1: 'Interstate', 2: 'Fwy/Expwy', 3: 'Principal',
#                             4: 'Minor', 5: 'Collector', 6: 'Local', 7: 'Un'},
    "Crash_with_rollover": {1: "Y", 2:"N"},
    "Involve_speeding": {1: "Y", 2:"N"},

}

for col_name in decode_dict1:
    for key, value in decode_dict1[col_name].items():
        df_acc = df_acc.withColumn(col_name, when(col(col_name) == key, value).otherwise(col(col_name)))

#### We are not decoding few columns as it is significantly reducing the performance. So, we will use this labels directly as and when we need them.

In [27]:
decode_dict2 = {}

for name in df_veh.columns[2:]:
    values2 = df_veh.select(name).distinct().rdd.map(lambda x: x[0]).collect()

    decode_dict2[name] = values2

                                                                                

In [28]:
decode_dict2

{'Veh_body_type': [1, 6, 3, 5, 9, 4, 8, 7, 2],
 'Distracted_driver': [1, 2],
 'Drowsy_driver': [1, 2],
 'Ini_impact_point': [1, 6, 3, 5, 4, 7, 2],
 'License_status': [1, 3, 4, 2],
 'Speeding_involved': [1, 2],
 'School_bus': [1, 3, 2],
 'Rollover': [1, 2]}

In [29]:
# Define a dictionary for column decoding
decode_dict2 = {
    "Veh_body_type" : {1: "Pass_car", 2: "Pickup", 3: "Utility", 4: "Van", 5: "Li_Trk-oth",
                       6: "Lrg_Truck", 7: "MC", 8: "Bus", 9: "Un"},
      
    "Distracted_driver": {1: "Y", 2: "N"},
    "Drowsy_driver": {1: "Y", 2: "N"},
    "Ini_impact_point": {1: 'Non-Coll', 2: 'Front', 3: 'Right_S', 4: 'Rear', 5: 'Left_S', 6: 'Oth', 7: 'Un'},
    "License_status": {1: "Val", 2: "Inval", 3: "Un", 4:"NA"},
    "Speeding_involved": {1: "Y", 2: "N"},
    "School_bus": {1: "Y", 2:"Veh_as_bus", 3:"oth"},
    "Rollover": {1: "Y", 2: "N"} 
}

for col_name in decode_dict2:
    for key, value in decode_dict2[col_name].items():
        df_veh = df_veh.withColumn(col_name, when(col(col_name) == key, value).otherwise(col(col_name)))

In [30]:
decode_dict3 = {}

for name in df_per.columns[2:]:
    values3 = df_per.select(name).distinct().rdd.map(lambda x: x[0]).collect()

    decode_dict3[name] = values3

                                                                                

In [31]:
decode_dict3

{'Age_group': [1, 6, 3, 5, 4, 8, 7, 2],
 'Alcohol_test': [1, 3, 5, 4, 2],
 'Ejected': [1, 3, 2],
 'Helmet_use': [None, 1, 3, 2],
 'Person_injury_type': [1, 6],
 'Person_type': [1, 3, 5, 4, 2]}

In [32]:

decode_dict3 = {
    "Age_group" : {1: "<16", 2: "16-20", 3: "21-24", 4: "25-34", 5: "35-44", 6: "45-64", 
                   7: "65+", 8: "Un"},
    
    "Alcohol_test" : { 1: 'No-Alc', 2: '+ve_BAC', 3: 'Not_Tested', 4: 'Tested/Unknown',
                       5: 'Un'},

    "Ejected" : { 1: "N", 2: "Y", 3: "Un" },
    
    "Helmet_use": {1: "Y", 2: "N", 3: "Un"},
    
    "Person_injury_type": {1: 'Fatal', 2: 'Incapacitating Injured Estimate', 3: 'Nonincapacitating Injured Estimate', 
                           4: 'Other Injured Estimate', 5: 'Not Injured Estimate',
                           6: 'Un', 7: 'NA'},
    
    "Person_type" : { 1: 'Driver', 2: 'Occupant', 3: 'Pedestrian', 4: 'cyclist',
                    5: 'Un' }
}

for col_name in decode_dict3:
    for key, value in decode_dict3[col_name].items():
        df_per = df_per.withColumn(col_name, when(col(col_name) == key, value).otherwise(col(col_name)))

### Checking null Values

In [33]:
for i in df_acc.columns:
    null_count = df_acc.where(df_acc[i].isNull()).count()
    if null_count > 0:
        print(col, null_count)

                                                                                

In [34]:
for i in df_veh.columns:
    null_count = df_veh.where(df_veh[i].isNull()).count()
    if null_count > 0:
        print(i, null_count)



23/03/17 19:43:18 WARN DAGScheduler: Broadcasting large task binary with size 14.4 MiB


                                                                                

23/03/17 19:43:52 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB


                                                                                

In [35]:
for i in df_per.columns:
    null_count = df_per.where(df_per[i].isNull()).count()
    if null_count > 0:
        print(col, null_count)

                                                                                

23/03/17 19:44:22 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB


                                                                                

23/03/17 19:44:46 WARN DAGScheduler: Broadcasting large task binary with size 1004.2 KiB


                                                                                

<function col at 0x7fdb18265e10> 1760300
23/03/17 19:45:01 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB


                                                                                

23/03/17 19:45:17 WARN DAGScheduler: Broadcasting large task binary with size 1004.2 KiB


                                                                                

#### Only helmet_use column has got missing values which can which is because this column was added later on in recent years there are still other columns with data not available but it will not impact in analysis purpose hence we do not need to do any treatment on it.

## Saving data back to hdfs

In [36]:
df_acc.write.format("parquet").option("compression", "snappy").partitionBy("YEAR").mode("overwrite").save("/FARS/Clean_data/acc/")

23/03/17 19:45:40 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB


[Stage 705:>                                                       (0 + 8) / 39]

23/03/17 19:45:42 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/03/17 19:45:42 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/03/17 19:45:44 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers




23/03/17 19:45:46 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/03/17 19:45:46 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/03/17 19:45:46 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers




23/03/17 19:45:51 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/03/17 19:45:51 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/03/17 19:45:51 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


                                                                                

In [37]:
df_veh.write.format("parquet").option("compression", "snappy").partitionBy("YEAR").mode("overwrite").save("/FARS/Clean_data/veh/")

23/03/17 19:46:03 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


[Stage 706:>                                                       (0 + 8) / 39]

23/03/17 19:46:04 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/03/17 19:46:05 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/03/17 19:46:05 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


                                                                                

In [38]:
df_per.write.format("parquet").option("compression", "snappy").partitionBy("YEAR").mode("overwrite").save("/FARS/Clean_data/per/")

23/03/17 19:46:22 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


                                                                                