# Road Accident Analysis in Victoria
*FIT5202 Data processing for Big data* | **Phase 2: Data Wrangling and Exploration** 

<a class="anchor" id="toc"></a>
## Table of Contents

* [Introduction](#intro)
* [Import library](#import)
* [Initialize SparkSession](#initialize)
* [Data Loading and Preparation](#data_loading)
* [Data Exploration](#data_exploration)
    * [accident](#explore_accident)
    * [accident_chainage](#explore_accident_chainage)
    * [accident_event](#explore_accident_event)
    * [accident_location](#explore_accident_location)
    * [atmospheric_condition](#explore_atmosp_cond)
    * [node](#explore_node)
    * [node_id](#explore_node_id)
    * [person](#explore_person)
    * [surface_condition](#explore_surface_cond)
    * [subdca](#explore_subdca)
    * [vehicle](#explore_vehicle)
* [Data Cleaning & Transformation](#data_transformation)
    * [Part 1: Clean and Transform individual dataframe/dataset](#transform_part1) 
        * [atmospheric_condition](#transform_atmosp_cond) 
        * [surface_condition](#transform_surface_cond) 
        * [person](#transform_person) 
        * [node](#transform_node) 
        * [accident](#transform_accident) 
        * [vehicle](#transform_vehicle) 
    * [Part 2: Join all the clean dataframes into a joined dataframe](#transform_part2) 
    * [Part 3: Perform exploration, cleaning & transformation on the joined dataframe](#transform_part3)    
    * [Summary](#transformation_summary)
* [Export clean dataset](#export_final)

<a class="anchor" id="intro"></a>
<div style="background:rgba(0,80,80,0.2);padding:10px;border-radius:4px"><h2>Introduction</h2>
<hr/>
</div>

The aim of this project is to deliver a predicting modelling that will inform road safety partners in minimising the risk associated with roads, vehicles, speed and drivers within the community. The model will provide insight on situations where road accidents are more likely to occur in order to inform policy makers to make better decisions on how to reduce the incidence of serious injury and/or death resulting from road accidents.   

------

In this exercise, data cleansing and data wrangling are performed on the 11 files downloaded from https://www.data.vic.gov.au/ website. See the brief description of each file below:

[DATASETS](https://discover.data.vic.gov.au/dataset/crash-statistics) :
  1. **accident** - details of each accident incident such as date
  2. **accident_chainage** - details of route and chainage data such as route number, route link no etc 
  3. **accident_event** - details of events occured in each accident such as collision, fell from vehicle etc. 
  4. **accident_location** - details of local location of the accident such as road name, type etc
  5. **atmospheric_condition** - describes the weather conditions during accident.
  6. **node** - provides area where the accident occured such as region, LGA etc.
  7. **node_id_complex_int_id** - this is a table linking the accident and node table.
  8. **person** - details of each person involved in each accident such as age, whether seatbelt is worn etc
  9. **surface_condition** - details of the surface conditions of the road such as dry, wet, muddy etc
  10. **subdca** - details of how the accident occured. Refer to the [VicRoads Crash Stats User Guide and Appendices](https://data.vicroads.vic.gov.au/metadata/Crashstats_user_Guide_and_Appendices.pdf) for visual explanation referenced by the code. 
  11. **vehicle** - details of each vehicle involved in each accident such as make, model, year

Some of the cleansing and wrangling tasks performed are:
* correct data types
* remove duplicates
* fill in missing values
* data validation
* remove duplicates
* remove outliers

The tables will be joined to form a master table. Further exploration, cleansing and transformation are done to a desired format and shape in preparation for running Machine Learning models in the next phase. A careful selection of the relevant variables is justified in detail. This is to ensure a high probability prediction from the Machine Learning model.

**META-DATA REFERENCES:**  

[VicRoads Crash Stats User Guide and Appendices](https://data.vicroads.vic.gov.au/metadata/Crashstats_user_Guide_and_Appendices.pdf)   
[Metadata for Crash Stats Data Extract](https://data.vicroads.vic.gov.au/Metadata/Crash%20Stats%20-%20Data%20Extract%20-%20Open%20Data.html)

<a class="anchor" id="import"></a>
<div style="background:rgba(0,80,80,0.2);padding:10px;border-radius:4px"><h2>Import library</h2>
<hr/>
</div>

In [None]:
# import library
import pandas as pd
from pyspark.sql.functions import col, countDistinct, isnan, when, count, regexp_replace, regexp_extract, substring, trim, first, round, spark_partition_id, asc, desc, broadcast
from pyspark.sql import functions as F, Window
from pyspark.sql.types import StructType, StringType, IntegerType, BooleanType, DoubleType,TimestampType


## display all columns for Pandas Dataframe
# TURN ON for exploration phase. T
## TURN OFF this options before submission for better performance
pd.set_option('display.max_columns', None)
#pd.set_option('display.max_rows', None)

<a class="anchor" id="initialize"></a>
<div style="background:rgba(0,80,80,0.2);padding:10px;border-radius:4px"><h2>Initialize Spark Session</h2>
<hr/>
</div>

In [None]:
# Import SparkConf class into program
from pyspark import SparkConf

# Set the options to run Spark in local mode with different processors requirements.
master = "local[*]"

# The `appName` field is a name to be shown on the Spark cluster UI page
app_name = "Road Accident Analysis in Victoria"

# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)
# test with force parameters with higher machine specs
# spark_conf = SparkConf().setMaster(master).setAppName(app_name).setAll([('spark.executor.memory', '12g'), ('spark.executor.cores', '6'), ('spark.cores.max', '6'), ('spark.driver.memory','12g')])

# Import SparkContext and SparkSession classes
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL

# Create SparkSession
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

In [None]:
# get SparkSession info
sc.getConf().getAll()

[Back to Table of Contents](#toc)

<a class="anchor" id="data_loading"></a>
<div style="background:rgba(0,80,80,0.2);padding:10px;border-radius:4px"><h2>Data loading and Preparation</h2>
<hr/>
    
This section focuses on loading the files. The tasks involves:  

- specifying the data schema for the dataframes/datasets, to ensure data types are correct for any later tranformation and calculation operations. Datatypes were identified after initial pass of data without specifying schema originally and inspecting the files
- loading files  
- understanding structures, including number of attributes/columns and number of records/row
- define schemas in advance allowing the correct loading of values for next stages of exploration.
    
</div>

In [None]:
## Import 11-files for exploration ##

# Import: ACCIDENT.csv

# Rename Any columns with lower case / spaces in them.
accident_schema = StructType() \
    .add("ACCIDENT_NO",StringType()) \
    .add("ACCIDENTDATE",StringType()) \
    .add("ACCIDENTTIME",StringType()) \
    .add("ACCIDENT_TYPE",IntegerType()) \
    .add("ACCIDENT_TYPE_DESC",StringType()) \
    .add("DAY_OF_WEEK",IntegerType()) \
    .add("DAY_OF_WEEK_DESC",StringType()) \
    .add("DCA_CODE",IntegerType()) \
    .add("DCA_DESC",StringType()) \
    .add("DIRECTORY",StringType()) \
    .add("EDITION",StringType()) \
    .add("PAGE",StringType()) \
    .add("GRID_REFERENCE_X",StringType()) \
    .add("GRID_REFERENCE_Y",StringType()) \
    .add("LIGHT_CONDITION",IntegerType()) \
    .add("LIGHT_CONDITION_DESC",StringType()) \
    .add("NODE_ID",IntegerType()) \
    .add("NO_OF_VEHICLES",IntegerType()) \
    .add("NO_PERSONS",IntegerType()) \
    .add("NO_PERSONS_INJ_2",IntegerType()) \
    .add("NO_PERSONS_INJ_3",IntegerType()) \
    .add("NO_PERSONS_KILLED",IntegerType()) \
    .add("NO_PERSONS_NOT_INJ",IntegerType()) \
    .add("POLICE_ATTEND",IntegerType()) \
    .add("ROAD_GEOMETRY",IntegerType()) \
    .add("ROAD_GEOMETRY_DESC",StringType()) \
    .add("SEVERITY",IntegerType()) \
    .add("SPEED_ZONE",StringType()) 


df_accident = spark.read.format('csv')\
            .option('header',True).option('escape','"')\
            .schema(accident_schema)\
            .load('data/ACCIDENT.csv')\
            .cache()

In [None]:

# Import: ACCIDENT_CHAINAGE.csv
accident_chainage_schema = StructType() \
    .add("NODE_ID",IntegerType())\
    .add("ROUTE_NO", IntegerType())\
    .add("CHAINAGE_SEQ", IntegerType())\
    .add("ROUTE_LINK_NO",IntegerType())\
    .add("CHAINAGE",IntegerType())

df_accident_chainage = spark.read.format('csv')\
            .option('header',True).option('escape','"')\
            .schema(accident_chainage_schema)\
            .load('data/ACCIDENT_CHAINAGE.csv')\
            .cache()

In [None]:
# Import: ACCIDENT_EVENT.csv

accident_event_schema = StructType()\
    .add("ACCIDENT_NO",StringType())\
    .add("EVENT_SEQ_NO",IntegerType())\
    .add("EVENT_TYPE", StringType())\
    .add("EVENT_TYPE_DESC", StringType())\
    .add("VEHICLE_1_ID", StringType())\
    .add("VEHICLE_1_COLL_PT", StringType())\
    .add("VEHICLE_1_COLL_PT_DESC", StringType())\
    .add("VEHICLE_2_ID", StringType())\
    .add("VEHICLE_2_COLL_PT", StringType())\
    .add("VEHICLE_2_COLL_PT_DESC", StringType())\
    .add("PERSON_ID", StringType())\
    .add("OBJECT_TYPE", StringType())\
    .add("OBJECT_TYPE_DESC", StringType())

df_accident_event = spark.read.format('csv')\
            .option('header',True).option('escape','"')\
            .schema(accident_event_schema)\
            .load('data/ACCIDENT_EVENT.csv')\
            .cache()

In [None]:
# Import: ACCIDENT_LOCATION.csv
accident_location_schema = StructType()\
    .add("ACCIDENT_NO",StringType())\
    .add("NODE_ID", IntegerType())\
    .add("ROAD_ROUTE_1", IntegerType())\
    .add("ROAD_NAME", StringType())\
    .add("ROAD_TYPE", StringType())\
    .add("ROAD_NAME_INT", StringType())\
    .add("ROAD_TYPE_INT", StringType())\
    .add("DISTANCE_LOCATION", IntegerType())\
    .add("DIRECTION_LOCATION", StringType())\
    .add("NEAREST_KM_POST", StringType())\
    .add("OFF_ROAD_LOCATION", StringType())

df_accident_location = spark.read.format('csv')\
            .option('header',True).option('escape','"')\
            .load('data/ACCIDENT_LOCATION.csv')\
            .cache()

In [None]:

# Import: ATMOSPHERIC_COND.csv
atmos_cond_shema = StructType()\
    .add("ACCIDENT_NO",StringType())\
    .add("ATMOSPH_COND",StringType())\
    .add("ATMOSPH_COND_SEQ",IntegerType())\
    .add("ATMOSPH_COND_DESC",StringType())


df_atmospheric_cond = spark.read.format('csv')\
            .option('header',True).option('escape','"')\
            .schema(atmos_cond_shema)\
            .load('data/ATMOSPHERIC_COND.csv')\
            .cache()

In [None]:

# Import: NODE.csv

node_schema = StructType()\
    .add("ACCIDENT_NO", StringType())\
    .add("NODE_ID", IntegerType())\
    .add("NODE_TYPE", StringType())\
    .add("VICGRID94_X", DoubleType())\
    .add("VICGRID94_Y", DoubleType())\
    .add("LGA_NAME", StringType())\
    .add("LGA_NAME_ALL", StringType())\
    .add("REGION_NAME", StringType())\
    .add("DEG_URBAN_NAME", StringType())\
    .add("LAT", DoubleType())\
    .add("LONG", DoubleType())\
    .add("POSTCODE_NO", IntegerType())

df_node = spark.read.format('csv')\
            .option('header',True).option('escape','"')\
            .schema(node_schema)\
            .load('data/NODE.csv')\
            .cache()

In [None]:
# Import: NODE_ID_COMPLEX_INT_ID.csv

node_id_schema = StructType()\
    .add("ACCIDENT_NO",StringType())\
    .add("NODE_ID",IntegerType())\
    .add("COMPLEX_INT_NO",IntegerType())


df_node_id= spark.read.format('csv')\
            .option('header',True).option('escape','"')\
            .schema(node_id_schema)\
            .load('data/NODE_ID_COMPLEX_INT_ID.csv')\
            .cache()

In [None]:
# Import: PERSON.csv

person_schema = StructType()\
    .add("ACCIDENT_NO",StringType())\
    .add("PERSON_ID",StringType())\
    .add("VEHICLE_ID",StringType())\
    .add("SEX",StringType())\
    .add("AGE",IntegerType())\
    .add("AGE_GROUP",StringType())\
    .add("INJ_LEVEL",IntegerType())\
    .add("INJ_LEVEL_DESC",StringType())\
    .add("SEATING_POSITION",StringType())\
    .add("HELMET_BELT_WORN",IntegerType())\
    .add("ROAD_USER_TYPE",IntegerType())\
    .add("ROAD_USER_TYPE_DESC",StringType())\
    .add("LICENCE_STATE",StringType())\
    .add("PEDEST_MOVEMENT",IntegerType())\
    .add("POSTCODE",IntegerType())\
    .add("TAKEN_HOSPITAL",StringType())\
    .add("EJECTED_CODE",IntegerType())

df_person = spark.read.format('csv')\
            .option('header',True).option('escape','"')\
            .schema(person_schema)\
            .load('data/PERSON.csv')\
            .cache()


In [None]:
# Import: ROAD_SURFACE_COND.csv

surface_cond_schema = StructType()\
    .add("ACCIDENT_NO", StringType())\
    .add("SURFACE_COND", IntegerType())\
    .add("SURFACE_COND_DESC", StringType())\
    .add("SURFACE_COND_SEQ",IntegerType())

df_surface_cond = spark.read.format('csv')\
            .option('header',True).option('escape','"')\
            .schema(surface_cond_schema)\
            .load('data/ROAD_SURFACE_COND.csv')\
            .cache()

In [None]:
# Import: SUBDCA.csv

subdca_schema = StructType()\
    .add("ACCIDENT_NO", StringType())\
    .add("SUB_DCA_CODE", StringType())\
    .add("SUB_DCA_SEQ", IntegerType())\
    .add("SUB_DCA_CODE_DESC",StringType())

df_subdca = spark.read.format('csv')\
            .option('header',True).option('escape','"')\
            .schema(subdca_schema)\
            .load('data/SUBDCA.csv')\
            .cache()

In [None]:

# Import: VEHICLE.csv
vehicle_schema = StructType()\
    .add("ACCIDENT_NO", StringType())\
    .add("VEHICLE_ID", StringType())\
    .add("VEHICLE_YEAR_MANUF", StringType())\
    .add("VEHICLE_DCA_CODE", StringType())\
    .add("INITIAL_DIRECTION", StringType())\
    .add("ROAD_SURFACE_TYPE", IntegerType())\
    .add("ROAD_SURFACE_TYPE_DESC", StringType())\
    .add("REG_STATE", StringType())\
    .add("VEHICLE_BODY_STYLE", StringType())\
    .add("VEHICLE_MAKE", StringType())\
    .add("VEHICLE_MODEL", StringType())\
    .add("VEHICLE_POWER", IntegerType())\
    .add("VEHICLE_TYPE", StringType())\
    .add("VEHICLE_TYPE_DESC", StringType())\
    .add("VEHICLE_WEIGHT", IntegerType())\
    .add("CONSTRUCTION_TYPE", StringType())\
    .add("FUEL_TYPE", StringType())\
    .add("NO_OF_WHEELS", IntegerType())\
    .add("NO_OF_CYLINDERS", IntegerType())\
    .add("SEATING_CAPACITY", IntegerType())\
    .add("TARE_WEIGHT", IntegerType())\
    .add("TOTAL_NO_OCCUPANTS", IntegerType())\
    .add("CARRY_CAPACITY", IntegerType())\
    .add("CUBIC_CAPACITY", IntegerType())\
    .add("FINAL_DIRECTION", StringType())\
    .add("DRIVER_INTENT", StringType())\
    .add("VEHICLE_MOVEMENT", StringType())\
    .add("TRAILER_TYPE", StringType())\
    .add("VEHICLE_COLOUR_1", StringType())\
    .add("VEHICLE_COLOUR_2", StringType())\
    .add("CAUGHT_FIRE", StringType())\
    .add("INITIAL_IMPACT", StringType())\
    .add("LAMPS", IntegerType())\
    .add("LEVEL_OF_DAMAGE", IntegerType())\
    .add("OWNER_POSTCODE", StringType())\
    .add("TOWED_AWAY_FLAG", StringType())\
    .add("TRAFFIC_CONTROL", StringType())\
    .add("TRAFFIC_CONTROL_DESC", StringType())

df_vehicle = spark.read.format('csv')\
            .option('header',True).option('escape','"')\
            .schema(vehicle_schema)\
            .load('data/VEHICLE.csv')\
            .cache()

In [None]:
### This section is turned off for assignment submission, uncomment as required

# This section was used 
# to verify the first loading (without schema) 
# to verify the loading result (after schema defined)

# check the Structures for all dataframe 
# print("df_accident")
# df_accident.printSchema()

# print("df_accident_chainage")
# df_accident_chainage.printSchema()

# print("df_accident_event")
# df_accident_event.printSchema()

# print("df_accident_location")
# df_accident_location.printSchema()

# print("df_atmospheric_cond")
# df_atmospheric_cond.printSchema()

# print("df_node")
# df_node.printSchema()

# print("df_node_id")
# df_node_id.printSchema()

# print("df_person")
# df_person.printSchema()

# print("df_surface_cond")
# df_surface_cond.printSchema()

# print("df_subdca")
# df_subdca.printSchema()

# print("df_vehicle")
# df_vehicle.printSchema()

In [None]:
df_shape_initial = spark.createDataFrame(
    [
        ("df_accident", "Number of Records", df_accident.count() ), 
        ("df_accident", "Number of Columns", len(df_accident.columns)), 
        ("df_accident_chainage", "Number of Records", df_accident_chainage.count() ), 
        ("df_accident_chainage", "Number of Columns", len(df_accident_chainage.columns)), 
        ("df_accident_event", "Number of Records", df_accident_event.count() ), 
        ("df_accident_event", "Number of Columns", len(df_accident_event.columns)), 
        ("df_accident_location", "Number of Records", df_accident_location.count() ), 
        ("df_accident_location", "Number of Columns", len(df_accident_location.columns)), 
        ("df_atmospheric_cond", "Number of Records", df_atmospheric_cond.count() ), 
        ("df_atmospheric_cond", "Number of Columns", len(df_atmospheric_cond.columns)), 
        ("df_node", "Number of Records", df_node.count() ), 
        ("df_node", "Number of Columns", len(df_node.columns)), 
        ("df_node_id", "Number of Records", df_node_id.count() ), 
        ("df_node_id", "Number of Columns", len(df_node_id.columns)), 
        ("df_person", "Number of Records", df_person.count() ), 
        ("df_person", "Number of Columns", len(df_person.columns)), 
        ("df_surface_cond", "Number of Records", df_surface_cond.count() ), 
        ("df_surface_cond", "Number of Columns", len(df_surface_cond.columns)), 
        ("df_subdca", "Number of Records", df_subdca.count() ), 
        ("df_subdca", "Number of Columns", len(df_subdca.columns)), 
        ("df_vehicle", "Number of Records", df_vehicle.count() ), 
        ("df_vehicle", "Number of Columns", len(df_vehicle.columns)), 
    ],
    ["DATAFRAME", "SHAPE","INITIAL RESULTS"] 
)

In [None]:
# check the shape of the dataframe
df_shape_initial.groupBy("DATAFRAME").pivot("SHAPE").sum().sort("Number of Records", ascending=False).show()

[Back to Table of Contents](#toc)

 <a class="anchor" id="data_exploration"></a>
<div style="background:rgba(0,80,80,0.2);padding:10px;border-radius:4px"><h2>Data Exploration</h2>
<hr/>

This section mainly focuses on data exploration on individual datasets.The following activities, but not limited to, will be performed:
- perform summary statitistic, including for each columns, number of records, number of unique records, type of values/inputs  
- understand the records  
- checking any missing/null values   
- checking data distribution for each column   
- checking any duplicates  
- checking any inconsistency/invalid data  

</div>

 <a class="anchor" id="explore_accident"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> accident</strong></div> 

In [None]:
#Summary Statistics: # df_accident
df_accident.describe().toPandas()

In [None]:
# view some data
#df_accident.show(2,vertical=True,truncate=False)
df_accident.limit(5).toPandas()

Exploring the columns relationship:  
`NO_PERSONS` = `NO_PERSONS_INJ_2` (Injury level 2) + `NO_PERSONS_INJ_3` (Injury level 3) + `NO_PERSONS_KILLED` + `NO_PERSONS_NOT_INJ` (No Injury)

In [None]:
# checking missing/null values : df_accident
df_accident.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_accident.columns]).show(2,vertical=True,truncate=False)

`DIRECTORY`, `EDITION`, `PAGE`, `GRID_REFERENCE_X`,`GRID_REFERENCE_Y` (map related data) contain missing value.

In [None]:
# check unqiue records for each column
df_accident.select([countDistinct(c).alias(c) for c in df_accident.columns]).show(vertical=True,truncate=False)

`ACCIDENT_NO` is a unique record.  
`SEVERITY` is classified for each accident.  
`DIRECTORY`, `EDITION`, `PAGE`, `GRID_REFERENCE_X`,`GRID_REFERENCE_Y` (map related data) contain missing value.  
There are many key-value-pairs provided:  
- `ACCIDENT_TYPE` & `ACCIDENT_TYPE_DESC`  
- `DCA_CODE` & `DCA_DESC`  
- `LIGHT_CONDITION` & `LIGHT_CONDITION_DESC`  
- `ROAD_GEOMETRY` & `ROAD_GEOMETRY_DESC`

In [None]:
# check unqiue values for each column
[df_accident.groupBy(c).count().sort(col("count").desc()).show(truncate=False) for c in df_accident.columns]

`ACCIDENTTIME` contains trialing space in the text.  
`DIRECTORY`, `EDITION`, `PAGE`, `GRID_REFERENCE_X`,`GRID_REFERENCE_Y` (map related data) contain string with "space" in the text.  
`NODE_ID` contain value -1,-10,-3. Require to explore.
`SPEED_ZONE` requires to be updated with more description.  

In [None]:
# check duplicate
df_accident.groupBy("ACCIDENT_NO").count().filter("count > 1").show()

no duplicate record.

In [None]:
# check pairing values: "ACCIDENT_TYPE" & "ACCIDENT_TYPE_DESC"
df_accident.groupBy("ACCIDENT_TYPE","ACCIDENT_TYPE_DESC").count().sort(col("ACCIDENT_TYPE")).show(truncate=False)

In [None]:
# check pairing values: "DCA_CODE" & "DCA_DESC"
df_accident.groupBy("DCA_CODE","DCA_DESC").count().sort(col("DCA_CODE")).show(truncate=False)

In [None]:
# check pairing values: "LIGHT_CONDITION" & "LIGHT_CONDITION_DESC"
df_accident.groupBy("LIGHT_CONDITION","LIGHT_CONDITION_DESC").count().sort(col("LIGHT_CONDITION")).show(truncate=False)

In [None]:
# check pairing values: "ROAD_GEOMETRY" & "ROAD_GEOMETRY_DESC"
df_accident.groupBy("ROAD_GEOMETRY","ROAD_GEOMETRY_DESC").count().sort(col("ROAD_GEOMETRY")).show(truncate=False)

In [None]:
# check pairing values: DAY_OF_WEEK & DAY_OF_WEEK_DESC
df_accident.groupBy("DAY_OF_WEEK","DAY_OF_WEEK_DESC").count().sort(col("DAY_OF_WEEK_DESC")).show(5,truncate=False)

There are few values for `DAY_OF_WEEK` for "Friday".

In [None]:
# check Friday records that are not 5  (select only Day Week Description = 2 and 4, where have lesser records)
df_accident.filter((col("DAY_OF_WEEK_DESC") == 'Friday') & (col("DAY_OF_WEEK").isin('2','4')))\
            .select(col('ACCIDENTDATE'),col('DAY_OF_WEEK'),col('DAY_OF_WEEK_DESC')).show()

Upon checking `ACCIDENTDATE` on calendar, these are Friday. This concludes that incorrect value for `DAY_OF_WEEK`.

In [None]:
# check validity of Node_ID (foreign key)
# check whether node_id in df_accidents exists in df_node
cond = [df_node.ACCIDENT_NO == df_accident.ACCIDENT_NO, df_node.NODE_ID == df_accident.NODE_ID] 
invalid_node_id = df_accident.join(df_node, cond, how='left_anti')\
                            .select(col('ACCIDENT_NO'),col('NODE_ID'))

# print the number of invalid NODE_ID
print("Invalid NODE_ID in df_accident  :", invalid_node_id.count())

# view few invalid info
invalid_node_id.show(2,vertical=True,truncate=False)

In [None]:
# check the validity of the NO_PERSONS
# `NO_PERSONS` = `NO_PERSONS_INJ_2` + `NO_PERSONS_INJ_3` + `NO_PERSONS_KILLED` + `NO_PERSONS_NOT_INJ` 

df_accident.agg(F.sum(col('NO_PERSONS_INJ_2')+col('NO_PERSONS_INJ_3')+col('NO_PERSONS_KILLED')+col('NO_PERSONS_NOT_INJ')-col('NO_PERSONS')).alias('Variance')).show()

In [None]:
# check which accident involved
df_accident.groupBy("ACCIDENT_NO")\
.agg(F.sum(col('NO_PERSONS_INJ_2')+col('NO_PERSONS_INJ_3')+col('NO_PERSONS_KILLED')+col('NO_PERSONS_NOT_INJ')-col('NO_PERSONS')).alias('Variance'))\
.filter("Variance <> 0").show(truncate=False)

In [None]:
# check the accidents where the statistics of people involved in the accidents are not matching with the formula
df_accident.filter(col('ACCIDENT_NO').isin('T20190014255')).show(2,vertical=True,truncate=False)
#df_accident.filter(col('ACCIDENT_NO').isin('T20190014255')).limit(5).toPandas()

In [None]:
# Explore person dataset for the accident
df_person.filter(col('ACCIDENT_NO').isin('T20190014255')).show(3,vertical=True,truncate=False)
#df_person.filter(col('ACCIDENT_NO').isin('T20190014255')).limit(5).toPandas()

After multiple manual validation, there are 234-accidents have incorrect statistics recorded in the datasets. When joining the tables, columns (`NO_PERSONS` = `NO_PERSONS_INJ_2` + `NO_PERSONS_INJ_3` + `NO_PERSONS_KILLED` + `NO_PERSONS_NOT_INJ`)  need to be adjusted or dropped.

**<font color='blue'> _SUMMARY_ </font>**  
* <strong> **Duplicate** </strong>: No duplicate records.
* <strong> **Missing values/null** </strong>:  
    `DIRECTORY`, `EDITION`, `PAGE`,	`GRID_REFERENCE_X`,	`GRID_REFERENCE_Y` (map/location related info) have plenty of missing.  
* <strong> **Outliers** </strong>:  
* <strong> **Data Consistency/Validity** </strong>:  
    `ACCIDENT_TYPE`, `ACCIDENT_TYPE_DESC` are paired correctly.  
    `DCA_CODE`, `DCA_DESC` are paired correctly.  
    `LIGHT_CONDITION`, `LIGHT_CONDITION_DESC` are paired correctly.  
    `ROAD_GEOMETRY`, `ROAD_GEOMETRY_DESC` are paired correctly.  
    `DAY_OF_WEEK`, `DAY_OF_WEEK_DESC` are paired **incorrectly**.  
    `NODE_ID` has **invalid** Node_ID that are not exist in df_node.  
    `NO_PERSONS` = `NO_PERSONS_INJ_2` + `NO_PERSONS_INJ_3` + `NO_PERSONS_KILLED` + `NO_PERSONS_NOT_INJ` has incorrect statistics compared to df_person.
* <strong> **Other** </strong>:  
    `POLICE_ATTEND` has no description. Only numerical numbers are provided without explanation.  
    `SEVERITY` has no description. Only numerical numbers are provided without explanation  
    `ACCIDENTTIME` potentially can group into different time period (Busy hours etc.)
    `SPEED_ZONE` has no description for group 777,888,999. 
    
**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
    `DAY_OF_WEEK` - align to a single index or drop the column  
    `POLICE_ATTEND`,`SEVERITY`,`SPEED_ZONE` -  extend the description  
    `NODE_ID` - remove invalid node_id or remove records  
    `ACCIDENTTIME` - group into different attributes (Peak, non-peak etc)   
    `NO_PERSONS`, `NO_PERSONS_INJ_2` + `NO_PERSONS_INJ_3` + `NO_PERSONS_KILLED` + `NO_PERSONS_NOT_INJ`: either has to be adjusted or dropped.
    Require to fine-tune the performance in join operation
     

<a class="anchor" id="explore_accident_chainage"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> accident_chainage </strong> </div>

In [None]:
#Summary Statistics : df_accident_chainage
df_accident_chainage.describe().toPandas()

In [None]:
# view some data
df_accident_chainage.show(truncate=False)
#df_accident_chainage.limit(5).toPandas()

`Node ID` has different notation than other dataset.

In [None]:
# checking missing/null values : df_accident_chainage
df_accident_chainage.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_accident_chainage.columns]).show(truncate=False)

In [None]:
# check unqiue records for each column
df_accident_chainage.select([countDistinct(c).alias(c) for c in df_accident_chainage.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue values for each column
[df_accident_chainage.groupBy(c).count().sort(col("count").desc()).show(10) for c in df_accident_chainage.columns]

In [None]:
# check duplicate
df_accident_chainage.groupBy("NODE_ID", "CHAINAGE_SEQ").count().filter("count > 1").show()

In [None]:
# check validity of Node_ID (foreign key)
# check whether node_id in df_accident_chainage  exists in df_node
invalid_node_id = df_accident_chainage.join(df_node, df_node['NODE_ID'] == df_accident_chainage["NODE_ID"], how='left_anti')\
                            .select(col('NODE_ID'))

# print the number of invalid NODE_ID
print("Invalid NODE_ID in df_accident_chainage  :", invalid_node_id.count())

# view few invalid info
invalid_node_id.show(truncate=False)

`NODE_ID` in df_accident_chainage has different notatation/format in df_node.    
Therefore, this dataset is not useful for the modelling since it is unable to look up/join.

**<font color='blue'> _SUMMARY_ </font>**  
* <strong> **Duplicate** </strong>: No duplicate records.
* <strong> **Missing values/null** </strong>: all columns have null values used.
* <strong> **Data Consistency/Validity** </strong>:  Invalid node_id 
    
**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
Since the infomation is not useful for the modelling, this dataset will not be used. No further action is required.

<a class="anchor" id="explore_accident_event"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> accident_event </strong> </div>

In [None]:
#Summary Statistics : df_accident_event
df_accident_event.describe().toPandas()

The key of the dataset is `ACCIDENT_NO` & `EVENT_SEQ_NO`  
Minimun 1-vehicle (`VEHICLE_1_ID`) recorded in 1-accident's event.  
Not all events have person infomation recorded(`PERSON_ID`).  

In [None]:
# view some data
#df_accident_event.show(5,vertical=True,truncate=False)
df_accident_event.limit(5).toPandas()

In [None]:
# checking missing/null values : df_accident_event 
df_accident_event.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_accident_event.columns]).show(2,vertical=True,truncate=False)

There are 4-missing values for 4-records.

In [None]:
# check unqiue records for each column
df_accident_event.select([countDistinct(c).alias(c) for c in df_accident_event.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue values for each column
[df_accident_event.groupBy(c).count().sort(col("count").desc()).show(truncate=False) for c in df_accident_event.columns]

There are many key-value-pairs provided:  
  `EVENT_TYPE` & `EVENT_TYPE_DESC`  
  `VEHICLE_1_COLL_PT` & `VEHICLE_1_COLL_PT_DESC`  
  `VEHICLE_2_COLL_PT` & `VEHICLE_2_COLL_PT_DESC`  
  `OBJECT_TYPE` & `OBJECT_TYPE_DESC`   
There are foreign key from another dataset:`ACCIDENT_NO` , `VEHICLE_1_ID`, `VEHICLE_2_ID`, `PERSON_ID`

In [None]:
# check duplicate
df_accident_event.groupBy("ACCIDENT_NO", "EVENT_SEQ_NO").count().filter("count > 1").show()

In [None]:
# display records that have missing values:
#df_accident_event.filter(col('EVENT_SEQ_NO').isNull()).show(5,truncate=False)
df_accident_event.filter(col('EVENT_SEQ_NO').isNull()).limit(5).toPandas()

No event is recorded for these `ACCIDENT_NO`

In [None]:
# check whether these records with missing values, have other records related to the same ACCIDENT_NO within df_accident_event
#df_accident_event.filter(col('ACCIDENT_NO').isin('T20170002776','T20170013375','T20170013384','T20170013459')).show(5,truncate=False)
df_accident_event.filter(col('ACCIDENT_NO').isin('T20170002776','T20170013375','T20170013384','T20170013459')).limit(5).toPandas()

No other event is recorded for these `ACCIDENT_NO`.

In [None]:
# check whether these ACCIDENT_NO exist in df_accident
#df_accident.filter(col('ACCIDENT_NO').isin('T20170002776','T20170013375','T20170013384','T20170013459')).show(5,truncate=False)
df_accident.filter(col('ACCIDENT_NO').isin('T20170002776','T20170013375','T20170013384','T20170013459')).limit(5).toPandas()

No event is recorded for `ACCIDENT_NO` ('T20170002776','T20170013375','T20170013384','T20170013459').

In [None]:
# check pairing values: EVENT_TYPE & EVENT_TYPE_DESC
df_accident_event.groupBy("EVENT_TYPE","EVENT_TYPE_DESC").count().sort(col("EVENT_TYPE")).show()

In [None]:
# check pairing values: VEHICLE_1_COLL_PT & VEHICLE_1_COLL_PT_DESC
df_accident_event.groupBy("VEHICLE_1_COLL_PT","VEHICLE_1_COLL_PT_DESC").count().sort(col("VEHICLE_1_COLL_PT")).show()

In [None]:
# check pairing values: VEHICLE_2_COLL_PT & VEHICLE_2_COLL_PT_DESC
df_accident_event.groupBy("VEHICLE_2_COLL_PT","VEHICLE_2_COLL_PT_DESC").count().sort(col("VEHICLE_2_COLL_PT")).show()

In [None]:
# check whether these ACCIDENT_NO == " " exist in df_accident
df_accident_event.filter(col('VEHICLE_2_COLL_PT_DESC').isin(" ")).count()

`VEHICLE_2_COLL_PT` contains " " as values, where in fact is null. Replacement is required.

In [None]:
# check pairing values: "OBJECT_TYPE" & "OBJECT_TYPE_DESC"
df_accident_event.groupBy("OBJECT_TYPE","OBJECT_TYPE_DESC").count().sort(col("OBJECT_TYPE")).show(truncate=False)
#df_accident_event.groupBy("OBJECT_TYPE","OBJECT_TYPE_DESC").count().sort(col("OBJECT_TYPE")).limit(20).toPandas()

In [None]:
# check ACCIDENT_NO validity (foreign key)

# check whether ACCIDENT_NO in df_accident_event_cond exists in df_accidents
invalid_accident_id = df_accident_event.join(df_accident, df_accident_event.ACCIDENT_NO == df_accident.ACCIDENT_NO, how='left_anti')\
                            .select(col('ACCIDENT_NO'))

# print the number of invalid NODE_ID
print("Invalid ACCIDENT_NO in df_accident_event :", invalid_accident_id.count())

# view few invalid info
invalid_accident_id.show(2,vertical=True,truncate=False)


**<font color='blue'> _SUMMARY_ </font>**  
* <strong> **Duplicate** </strong>: No duplicate records.  
* <strong> **Missing values/null** </strong>: There are 4-records with no event described. remove these records.  
* <strong> **Outliers** </strong>:  
* <strong> **Data Consistency/Validity** </strong>:  
    `EVENT_TYPE`,`EVENT_TYPE_DESC` are paired correctly.  
    `VEHICLE_1_COLL_PT`,`VEHICLE_1_COLL_PT_DESC` are paired correctly.  
    `VEHICLE_2_COLL_PT`,`VEHICLE_2_COLL_PT_DESC` are paired correctly. Missing value records required to be cleaned with null value.  
    `PERSON_ID`: Missing value records required to be cleaned with null value.  
    `OBJECT_TYPE`,`OBJECT_TYPE_DESC` are paired correctly.  
* <strong> **Other** </strong>:  
    
**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
Since the similar and more high-level infomation is available in accident and vehicle datasets for the modelling, this dataset will not be used. No further action is required.

<a class="anchor" id="explore_accident_location"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> accident_location </strong> </div>

In [None]:
#Summary Statistics : df_accident_location
df_accident_location.describe().toPandas()

Except `ACCIDENT_NO` & `NODE_ID`,  all columns contains missing values or space character.  
The top records for `NODE_ID` is -1. Further exploration is required.
Observing from `min` & `max` value of each column, there are records fill with character space " " , negative value (e.g. -1) or interesting value 'Z;;;', '9999','999'.

In [None]:
# view some data
#df_accident_location.show(2,vertical=True,truncate=False)
df_accident_location.limit(10).toPandas()

1-accident has multiple `NODE_ID`.  
`ROAD_ROUTE` has no relationship with `ROAD_NAME`.  
`DIRECTION_LOCATION` can be more descriptive to indicate the direction.  

In [None]:
# checking missing/null values : df_accident_location
#df_accident_location.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_accident_location.columns]).toPandas().transpose()
df_accident_location.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_accident_location.columns]).show(vertical=True,truncate=False)

Except `ACCIDENT_NO` & `NODE_ID`,  all columns contains missing values.

In [None]:
# check unqiue records for each column
#df_accident_event.select([countDistinct(c).alias(c) for c in df_accident_event.columns]).toPandas().transpose()
df_accident_event.select([countDistinct(c).alias(c) for c in df_accident_event.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue values for each column
[df_accident_location.groupBy(c).count().sort(col("count").desc()).show(truncate=False) for c in df_accident_location.columns]

This dataset mainly store the location where the accident happening. 
There are many records have missing values.  

In [None]:
# check duplicate
df_accident_location.groupBy("ACCIDENT_NO", "NODE_ID").count().filter("count > 1").show()

In [None]:
# check route infomation for NODE_ID with negative values
df_accident_location.filter(col('NODE_ID').isin('-1','-3','-10')).limit(10).toPandas()

In [None]:
# check the which 'NODE_ID' has no accident location    
df_accident_location.filter(col('ROAD_NAME').isNull())\
                    .groupBy('NODE_ID').count().show(15)
#                    .groupBy('NODE_ID').count().limit(15).toPandas()

Records with missing accident location should be removed.

In [None]:
# check ACCIDENT_NO & NODE_ID validity (foreign key)

# check whether ACCIDENT_NO & NODE_ID in df_accident_location exists in df_accidents
cond = [df_accident_location.ACCIDENT_NO == df_accident.ACCIDENT_NO, df_accident_location.NODE_ID == df_accident.NODE_ID]
invalid_accident_node = df_accident_location.join(df_accident, cond , how='left_anti')\
                            .select(col('ACCIDENT_NO'), col('NODE_ID'))

# print the number of invalid ACCIDENT_NO & NODE_ID
print("Invalid ACCIDENT_NO & NODE_ID in df_accident_location :", invalid_accident_node.count())

# view few invalid info
invalid_accident_node.show(truncate=False)


**<font color='blue'> _SUMMARY_ </font>**  
* <strong> **Duplicate** </strong>: No duplicate records.
* <strong> **Missing values/null** </strong>: no accident location provided however it seems ROAD_NAME_INT provides suburb information. records should be removed. 
    `ROAD_ROUTE_1, ROAD_NAME, ROAD_TYPE, ROAD_NAME_INT, ROAD_TYPE_INT, DISTANCE_LOCATION, DIRECTION_LOCATION, NEAREST_KM_POST, OFF_ROAD_LOCATION` - all have null values
* <strong> **Outliers** </strong>:  
* <strong> **Data Consistency/Validity** </strong>:  
* <strong> **Other** </strong>:  
    `DIRECTION_LOCATION` - should fill with more descriptive label      
    
**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
Since the similar and more high-level infomation is available in node dataset for the modelling, this dataset will not be used. No further action is required.

<a class="anchor" id="explore_atmosp_cond"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> atmospheric_cond </strong> </div>

In [None]:
#Summary Statistics : df_atmospheric_cond
df_atmospheric_cond.describe().toPandas()

This dataset is using composite key: `ACCIDENT_NO` & `ATMOSPH_COND_SEQ` ( to be confirmed in the next exploration).  
There are 8-atmospheric conditions: `ATMOSPH_COND` & `Atmosph Cond Desc`

In [None]:
# view some data
#df_atmospheric_cond.show(2,vertical=True,truncate=False)
df_atmospheric_cond.limit(5).toPandas()

In [None]:
# checking missing/null values : df_atmospheric_cond
df_atmospheric_cond.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_atmospheric_cond.columns]).show(vertical=True,truncate=False)
#df_atmospheric_cond.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_atmospheric_cond.columns]).toPandas()

In [None]:
# check unqiue records for each column
df_atmospheric_cond.select([countDistinct(c).alias(c) for c in df_atmospheric_cond.columns]).show(vertical=True,truncate=False)
#df_atmospheric_cond.select([countDistinct(c).alias(c) for c in df_atmospheric_cond.columns]).toPandas()

In [None]:
# check unqiue values for each column
[df_atmospheric_cond.groupBy(c).count().sort(col(c)).show(truncate=False) for c in df_atmospheric_cond.columns]

In [None]:
# check pairing values & its frequency: ATMOSPH_COND & Atmosph Cond Desc
df_atmospheric_cond.groupBy("ATMOSPH_COND","ATMOSPH_COND_DESC").count().sort(col("ATMOSPH_COND")).show(truncate=False)
#df_atmospheric_cond.groupBy("ATMOSPH_COND","ATMOSPH_COND_DESC").count().sort(col("ATMOSPH_COND")).toPandas()

In [None]:
# check duplicate
df_atmospheric_cond.groupBy("ACCIDENT_NO", "ATMOSPH_COND_DESC").count().filter("count > 1").show()

In [None]:
# check ACCIDENT_NO validity (foreign key)

# check whether ACCIDENT_NO in df_atmospheric_cond exists in df_accidents
invalid_accident_id = df_atmospheric_cond.join(df_accident, df_atmospheric_cond.ACCIDENT_NO == df_accident.ACCIDENT_NO, how='left_anti')\
                            .select(col('ACCIDENT_NO'))

# print the number of invalid ACCIDENT_NO
print("Invalid ACCIDENT_NO in df_atmospheric_cond  :", invalid_accident_id.count())

# view few invalid info
invalid_accident_id.show(truncate=False)

**<font color='blue'> _SUMMARY_ </font>**  
* <strong> **Duplicate** </strong>: No duplicate records.
* <strong> **Missing values/null** </strong>: No missing values   
* <strong> **Outliers** </strong>:  
* <strong> **Data Consistency/Validity** </strong>:  
* <strong> **Other** </strong>:  
    
**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
1-accident can have many atmospheric conditions. In order to avoid duplicated accidents infomation, this dataset needs to be converted from long to wide where atmospheric conditions form individual columns.  


<a class="anchor" id="explore_node"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> node </strong> </div>

In [None]:
#Summary Statistics : df_node
df_node.describe().toPandas()

In [None]:
# view some data
df_node.show(2,vertical=True,truncate=False)

In [None]:
# checking missing/null values : df_node
df_node.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_node.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue records for each column
df_node.select([countDistinct(c).alias(c) for c in df_node.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue values for each column & its frequency
[df_node.groupBy(c).count().sort(col(c)).show(truncate=False) for c in df_node.columns]

In [None]:
# check duplicate records
df_node.groupby("NODE_ID","ACCIDENT_NO").agg(count(col("ACCIDENT_NO")).alias('num_of_accidents')).sort(col("NODE_ID"))\
        .filter(col('num_of_accidents') > 1).show()

In [None]:
# check duplicated records
df_node.filter(col('ACCIDENT_NO').isin('T20080047570','T20110035570')).show(truncate=False)

Records are duplicted due to different `POSTCODE_NO`. In addition, for the same postcode, `DEG_URBAN_NAME` has different labels.

In [None]:
# check DEG_URBAN_NAME
df_node.groupBy("POSTCODE_NO","LGA_NAME","DEG_URBAN_NAME").count().sort(col("POSTCODE_NO")).show(5,truncate=False)

In [None]:
# check records with region_name empty/missing values
df_node.filter(col("REGION_NAME")==" ").show(5,truncate=False)

In [None]:
# check whether REGION_NAME exist in the same dataframe based on postcode
df_node.filter(col("POSTCODE_NO") == "3030").show(2,truncate=False)

In [None]:
# get a list of description of "LGA_NAME","LGA_NAME_ALL","REGION_NAME"  for each postcode
df_node.groupby("POSTCODE_NO","LGA_NAME","LGA_NAME_ALL","REGION_NAME", "DEG_URBAN_NAME").count().sort(col("POSTCODE_NO")).show(truncate=False)

`LGA_NAME_ALL`, `LGA_NAME`, `REGION NAME`, `DEG_URBAN_NAME`: 
    - consist missing values, 
    - `LGA_NAME`, `REGION NAME` have different labels or combination for the same `LGA_NAME_ALL`
    - for the same `POSTCODE_NO`, it has different `DEG_URBAN_NAME`

In [None]:
# check validity of ACCIDENT_NO & NODE_ID

# check whether the same ACCIDENT_NO & NODE_ID in df_node exists in df_accident
cond = [df_node.ACCIDENT_NO == df_accident.ACCIDENT_NO, df_node.NODE_ID == df_accident.NODE_ID] 
invalid_accident_node = df_node.join(df_accident, cond, how='left_anti')\
                            .select(col('ACCIDENT_NO'),col('NODE_ID'))

# print the number of invalid ACCIDENT_NO & NODE_ID
print("Invalid ACCIDENT_NO & NODE_ID in df_node  :", invalid_accident_node.count())

# view few invalid info
invalid_accident_node.show(truncate=False)


**<font color='blue'> _SUMMARY_ </font>**  
* <strong> **Duplicate** </strong>: There are a list of duplicated records due to different `POSTCODE_NO` for the same `ACCIDENT_NO` and `NODE_ID`  
* <strong> **Missing values/null** </strong>:  LGA_NAME, REGION NAME have missing values 
* <strong> **Outliers** </strong>:  no outliers to be detected. All postcodes point to VIC state
* <strong> **Data Consistency/Validity** </strong>:  
    `LGA_NAME_ALL`, `LGA_NAME`, `REGION NAME`: the same suburb has different `LGA_NAME_ALL`, `LGA_NAME`, or `REGION NAME`
    `DEG_URBAN_NAME`: for the same `POSTCODE_NO` has different label for `DEG_URBAN_NAME`. 
    * Assuming that each `REGION NAME` is has different `LGA_NAME_ALL`, `LGA_NAME` and different pockets or development make up labelled as `DEG_URBAN_NAME` 
* <strong> **Other** </strong>:  
    `NODE_TYPE` requires to be more descriptive label.
    
**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
* `LGA_NAME_ALL`, `LGA_NAME`, `REGION NAME`, `DEG_URBAN_NAME` with empty values need to be fill in and descrption needs to be aligned.  
* Remove duplicate entries where `ACCIDENT_NO` and `NODE_ID` are the same.
* Removing duplicates have also removed the missing values entries.
* Update `NODE_TYPE`  with more descriptive labels.

<a class="anchor" id="explore_node_id"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> node_id</strong> </div>

In [None]:
#Summary Statistics : df_node_id
df_node_id.describe().toPandas()

In [None]:
# view some data
df_node_id.show(2,vertical=True,truncate=False)

In [None]:
# checking missing/null values : df_node_id
df_node_id.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_node_id.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue records for each column
df_node_id.select([countDistinct(c).alias(c) for c in df_node_id.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue values for each column
[df_node_id.groupBy(c).count().sort(col(c)).show(truncate=False) for c in df_node_id.columns]

In [None]:
# check duplicate
df_node_id.groupBy("ACCIDENT_NO", "NODE_ID","COMPLEX_INT_NO").count().filter("count > 1").show()

In [None]:
# check validity of ACCIDENT_NO & NODE_ID

# check whether the same ACCIDENT_NO & NODE_ID in df_node exists in df_accident
cond = [df_node_id.ACCIDENT_NO == df_node.ACCIDENT_NO, df_node_id.NODE_ID == df_node.NODE_ID] 
invalid_accident_node = df_node_id.join(df_node, cond, how='left_anti')\
                            .select(col('ACCIDENT_NO'),col('NODE_ID'))

# print the number of invalid ACCIDENT_NO & NODE_ID
print("Invalid ACCIDENT_NO & NODE_ID in df_node  :", invalid_accident_node.count())

# view few invalid info
invalid_accident_node.show(10,truncate=False)

**<font color='blue'> _SUMMARY_ </font>**  
* <strong> **Duplicate** </strong>: No duplicate records.
* <strong> **Missing values/null** </strong>: 
    `COMPLEX_INT_NO` has significant records with null values.
    `NODE_ID` of -1,-3,-10 with unknown previously in dataset df_accident_location, exist in this dataset, where the accident_location are unknown.
* <strong> **Outliers** </strong>:  
* <strong> **Data Consistency/Validity** </strong>:  
    `ACCIDENT_NO` & `NODE_ID`: There are 854-records of data that do not exist in df_node. 
* <strong> **Other** </strong>:  
    
**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
This dataset is not relevant to the modelling. Therefore, no further action is required.

<a class="anchor" id="explore_person"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> person </strong> </div>

In [None]:
df_person.persist()

In [None]:
#Summary Statistics: df_person
df_person.describe().toPandas()

In [None]:
# view some data
df_person.show(2,vertical=True,truncate=False)

In [None]:
# checking missing/null values : df_person
df_person.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_person.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue records for each column
df_person.select([countDistinct(c).alias(c) for c in df_person.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue values for each column
[df_person.groupBy(c).count().sort(col(c)).show(truncate=False) for c in df_person.columns]

In [None]:
# check duplicate
df_person.groupBy("ACCIDENT_NO", "PERSON_ID").count().filter("count > 1").show()

In [None]:
# check pairing and frequency: correctness of assignment of Age to Age Group
df_person.crosstab("AGE_GROUP","AGE").toPandas()

Age correctly assigned to the `AGE_GROUP`.  
There are 21,629 records with unknown `AGE_GROUP` due to no value in `AGE`.

In [None]:
# check pairing and frequency: check the injury level vs age group
df_person.crosstab("AGE_GROUP","INJ_LEVEL_DESC").show()

Since we are predicting the severity road accident, it it essential to have infomation provided on Injury level. For records with unknown injury with unknown `Age Group`, records can consider to be removed from datasets.

In [None]:
# check pairing and frequency: 
df_person.crosstab("SEX","INJ_LEVEL_DESC").show()

Approx. 4% of records Gender/`Sex` is not known. Most of the person(s) are not injured (`Not injured`).  
The fields with `Sex = U` can fill with different genders based on distribution.

In [None]:
# check pairing and frequency
df_person.crosstab("ROAD_USER_TYPE_DESC","INJ_LEVEL_DESC").show(truncate=False)

In [None]:
# check pairing and frequency: 
df_person.crosstab("TAKEN_HOSPITAL","INJ_LEVEL_DESC").show(truncate=False)

In [None]:
# check pairing and frequency: 
df_person.crosstab("HELMET_BELT_WORN","INJ_LEVEL_DESC").show(truncate=False)

In [None]:
# check pairing and frequency: 
df_person.crosstab("EJECTED_CODE","INJ_LEVEL_DESC").show(truncate=False)

In [None]:
# check pairing and frequency
df_person.crosstab("ROAD_USER_TYPE_DESC","AGE_GROUP").show(truncate=False)

In [None]:
# check the info for person with Unknown injury
df_person.filter(col("INJ_LEVEL_DESC")=="Unknown").toPandas()

Unknown `INJ_LEVEL`, mainly `ROAD_USER_TYPE_DESC` is 'Driver', with unknown `SEX`, and blank or unknown on various attributes (e.g. `SEATING_POSITION`, `HELMET_BELT_WORN`,`TAKEN_HOSPITAL` etc.). 

In [None]:
# check ACCIDENT_NO validity (foreign key)

# check whether ACCIDENT_NO in df_person exists in df_accidents
invalid_accident_id = df_person.join(df_accident, df_person.ACCIDENT_NO == df_accident.ACCIDENT_NO, how='left_anti')\
                            .select(col('ACCIDENT_NO'))

# print the number of invalid NODE_ID
print("Invalid ACCIDENT_NO in df_person  :", invalid_accident_id.count())

# view few invalid info
invalid_accident_id.show(truncate=False)

**<font color='blue'> _SUMMARY_ </font>**  
* <strong> **Duplicate** </strong>: No duplicate records.
* <strong> **Missing values/null** </strong>:   
    `VEHICLE_ID`, `SEX`, `INJ_LEVEL`, `SEATING_POSITION`, `HELMET_BELT_WORN`, `ROAD_USER_TYPE`, `LICENCE_STATE`, `PEDEST_MOVEMENT`, `TAKEN_HOSPITAL`, `EJECTED_CODE` contain missing values represent with character space.

* <strong> **Outliers** </strong>:  
* <strong> **Data Consistency/Validity** </strong>:  
* `INJ_LEVEL_DESC` is validated with `INJ_LEVEL`
* `AGE_GROUP` is validated with `AGE`
* `AGE_GROUP` is reclassified to sensible age range
* <strong> **Other** </strong>:  
    `SEX`, `SEATING_POSITION`, `HELMET_BELT_WORN`,`TAKEN_HOSPITAL` & `EJECTED_CODE`: provide more descriptive label 
    
**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
* `VEHICLE_ID` - impute after joining VEHICLE table
* `SEX` - fill in blank values with "unknown" 
* `INJ_LEVEL` - impute missing values with most frequent value/mode
* `INJ_LEVEL_DESC` - impute unknown values based on INJ_LEVEL
* `SEATING_POSITION` - impute missing values with "No known"
* `HELMET_BELT_WORN` - impute missing values with "No known" 
* `ROAD_USER_TYPE` - no missing values
* `ROAD_USER_TYPE_DESC` - no missing values
* `LICENCE_STATE`, - impute missing values with "Not known" and others with meaningful description
* `PEDEST_MOVEMENT` - impute missing values with "Not known" and others with meaningful description
* `POSTCODE` - drop column as 20% of them are non-sensible postcodes and missing values
* `TAKEN_HOSPITAL`, - impute missing values with "Not known" and others with meaningful description
* `EJECTED_CODE`: impute missing values with "Not known" and others with meaningful description

<a class="anchor" id="explore_surface_cond"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> surface_cond</strong> </div>

In [None]:
#Summary Statistics : df_surface_cond
df_surface_cond.describe().toPandas()

In [None]:
# view some data
df_surface_cond.show(2,vertical=True,truncate=False)

In [None]:
# checking missing/null values : df_surface_cond
df_surface_cond.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_surface_cond.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue records for each column
df_surface_cond.select([countDistinct(c).alias(c) for c in df_surface_cond.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue values for each column
[df_surface_cond.groupBy(c).count().sort(col(c)).show(truncate=False) for c in df_surface_cond.columns]

In [None]:
# check duplicate
df_surface_cond.groupBy("ACCIDENT_NO", "SURFACE_COND_DESC").count().filter("count > 1").show()

In [None]:
# check pairing values: SURFACE_COND & Surface Cond Desc
df_surface_cond.groupBy("SURFACE_COND","SURFACE_COND_DESC").count().sort(col("SURFACE_COND")).show()

In [None]:
# check what sequence numbers are assigned to surface condition unknown
# to determine whether to transform this value
df_surface_cond.filter(col('SURFACE_COND') == 9)\
            .groupBy('SURFACE_COND_SEQ').count().show()

In [None]:
# check ACCIDENT_NO validity (foreign key)

# check whether ACCIDENT_NO in df_surface_cond exists in df_accidents
invalid_accident_id = df_surface_cond.join(df_accident, df_surface_cond.ACCIDENT_NO == df_accident.ACCIDENT_NO, how='left_anti')\
                            .select(col('ACCIDENT_NO'))

# print the number of invalid ACCIDENT_NO
print("Invalid ACCIDENT_NO in df_surface_cond  :", invalid_accident_id.count())

# view few invalid info
invalid_accident_id.show()

**<font color='blue'> _SUMMARY_ </font>**  
* <strong> **Duplicate** </strong>: No duplicate records.
* <strong> **Missing values/null** </strong>:   No missing values
* <strong> **Outliers** </strong>:  
* <strong> **Data Consistency/Validity** </strong>:  
* <strong> **Other** </strong>:  
    
**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
1-accident can have many surface conditions. In order to avoid duplicated accidents infomation, this dataset needs to be converted from long to wide where surface conditions form individual columns

<a class="anchor" id="explore_surface_cond"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> subdca</strong> </div>

In [None]:
#Summary Statistics :df_subdca
df_subdca.describe().toPandas()

In [None]:
# view some data
df_subdca.show(2,vertical=True,truncate=False)

In [None]:
#  checking missing/null values : df_subdca
df_subdca.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_subdca.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue records for each column
df_subdca.select([countDistinct(c).alias(c) for c in df_subdca.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue values for each column
[df_subdca.groupBy(c).count().sort(col(c)).show(truncate=False) for c in df_subdca.columns]

In [None]:
# check duplicate
df_subdca.groupBy("ACCIDENT_NO", "SUB_DCA_CODE").count().filter("count > 1").show()

In [None]:
# check pairing values: SUB_DCA_CODE & Sub Dca Code Desc
df_subdca.groupBy("SUB_DCA_CODE","SUB_DCA_CODE_DESC").count().sort(col("SUB_DCA_CODE")).show()

`SUB_DCA_CODE` has multiple codes with "Unknown" values.  
There are 121-rows of `SUB_DCA_CODE`. 1-accidents have multiple `SUB_DCA_CODE`. This can increase complexity to pivot the dataset from long to wider.

In [None]:
# check ACCIDENT_NO validity (foreign key)

# check whether ACCIDENT_NO in df_subdca exists in df_accidents
invalid_accident_id = df_subdca.join(df_accident, df_subdca.ACCIDENT_NO == df_accident.ACCIDENT_NO, how='left_anti')\
                            .select(col('ACCIDENT_NO'))

# print the number of invalid ACCIDENT_NO
print("Invalid ACCIDENT_NO in df_subdca  :", invalid_accident_id.count())

# view few invalid info
invalid_accident_id.show()

**<font color='blue'> _SUMMARY_ </font>**  
* <strong> **Duplicate** </strong>: No duplicate records.
* <strong> **Missing values/null** </strong>: `SUB_DCA_CODE` has multiple codes with "Unknown" values.	 
* <strong> **Outliers** </strong>:  
* <strong> **Data Consistency/Validity** </strong>:  
* <strong> **Other** </strong>:  
    
**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
1-accident can have  multiple `SUB_DCA_CODE`(up to 6). 

In order to avoid duplicated accidents infomation, this dataset needs to be converted from long to wide where each `SUB_DCA_CODE` form individual columns. This can be complex, where columns can take up to 120-columns. This dataset should further group and refine,if required.

Since DCA_CODE is also available in accident dataset, this dataset SUBDCA will not be used for modelling. No further action is required.

<a class="anchor" id="explore_vehicle"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> Vehicle</strong> </div>

In [None]:
#Summary Statistics - df_vehicle
df_vehicle.describe().toPandas()

In [None]:
# view some data
df_vehicle.show(2,vertical=True,truncate=False)

In [None]:
#  checking missing/null values : df_vehicle
df_vehicle.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_vehicle.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue records for each column
df_vehicle.select([countDistinct(c).alias(c) for c in df_vehicle.columns]).show(vertical=True,truncate=False)

In [None]:
# check unqiue values for each column
[df_vehicle.groupBy(c).count().sort(col("count").desc()).show(truncate=False) for c in df_vehicle.columns]

In [None]:
# check duplicate
df_vehicle.groupBy("ACCIDENT_NO", "VEHICLE_ID").count().filter("count > 1").show()

In [None]:
# check pairing values: ROAD_SURFACE_TYPE & ROAD_SURFACE_TYPE_DESC
df_vehicle.groupBy("ROAD_SURFACE_TYPE","ROAD_SURFACE_TYPE_DESC").count().sort(col("ROAD_SURFACE_TYPE")).show()

In [None]:
# check pairing values: VEHICLE_TYPE & Vehicle Type Desc
df_vehicle.groupBy("VEHICLE_TYPE","VEHICLE_TYPE_DESC").count().sort(col("VEHICLE_TYPE")).show(truncate=False)

In [None]:
# check pairing values: TRAFFIC_CONTROL & Traffic Control Desc
df_vehicle.groupBy("TRAFFIC_CONTROL","TRAFFIC_CONTROL_DESC").count().sort(col("TRAFFIC_CONTROL")).show(truncate=False)

In [None]:
# check ACCIDENT_NO validity (foreign key)

# check whether ACCIDENT_NO in df_vehicle exists in df_accidents
invalid_accident_id = df_vehicle.join(df_accident, df_vehicle.ACCIDENT_NO == df_accident.ACCIDENT_NO, how='left_anti')\
                            .select(col('ACCIDENT_NO'))

# print the number of invalid ACCIDENT_NO
print("Invalid ACCIDENT_NO in df_vehicle  :", invalid_accident_id.count())

# view few invalid info
invalid_accident_id.show()

**<font color='blue'> _SUMMARY_ </font>**  
* <strong> **Duplicate** </strong>: No duplicate records.
* <strong> **Missing values/null** </strong>: Not all infomation is provided for Vehicle 2   
* <strong> **Outliers** </strong>:  
* <strong> **Data Consistency/Validity** </strong>:  
* <strong> **Other** </strong>:  
    Inputs on vehicle's related info (brands) are not consistently spelled.
    
**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
- Provide descriptive data: 
    - `VEHICLE_DCA_CODE`, `LAMPS`, `INITIAL_IMPACT`, `CAUGHT_FIRE`, `FUEL_TYPE`, 
    - `LEVEL_OF_DAMAGE`, `TRAILER_TYPE`, `DRIVER_INTENT`,`VEHICLE_MOVEMENT`
- Update Vehicle make to most common value where it is obvious (assumed) what the make should be
- Reorder to align codes with description columns
- Drop Columns we will not use for analysis/modelling due to lack of completeness in dataset
- Sort data by `ACCIDENT_NO`


[Back to Table of Contents](#toc)

<a class="anchor" id="data_transformation"></a>
<div style="background:rgba(0,80,80,0.2);padding:10px;border-radius:4px"><h2>Cleaning & Transformation</h2>
<hr/>
This section focuses on cleaning and transforming relevant individual datasets, whereby the final output is a single cleaned dataset to be used for next phase of assignment. The activity will split into three parts: <br>
 <br> - <b> Part 1 </b> : Clean and Transform individual dataframe/dataset                
 <br> - <b> Part 2 </b> : Join all the clean dataframes into a joined dataframe    
 <br> - <b> Part 3 </b> : Perform exploration, cleaning & transformation on the joined dataframe.   
</div>

<a class="anchor" id="transform_part1"></a>
<div style="background:rgba(0,80,80,0.2);padding:10px;border-radius:4px"><h3>Part 1: Clean and Transform individual dataframe/dataset </h3>
</div>

<a class="anchor" id="transform_atmosp_cond"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> atmospheric_cond </strong> </div>

**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
* One accident has multiple atmospheric conditions - Pivot the table from long to wide, where each atmospheric conditions form individual columns by `ACCIDENT_NO`  
* Sort by `ACCIDENT_NO`

In [None]:
ndf_atmospheric_cond = df_atmospheric_cond.groupBy('ACCIDENT_NO').pivot('ATMOSPH_COND_DESC').count()\
            .withColumn('ATMOSPH_CLEAR', when(col('Clear') == 1, "Yes").otherwise('No'))\
            .withColumn('ATMOSPH_RAINING', when(col('Raining') == 1, "Yes").otherwise('No'))\
            .withColumn('ATMOSPH_SNOWING', when(col('Snowing') == 1, "Yes").otherwise('No'))\
            .withColumn('ATMOSPH_FOG', when(col('Fog') == 1, "Yes").otherwise('No'))\
            .withColumn('ATMOSPH_SMOKE', when(col('Smoke') == 1, "Yes").otherwise('No'))\
            .withColumn('ATMOSPH_DUST', when(col('Dust') == 1, "Yes").otherwise('No'))\
            .withColumn('ATMOSPH_STRONG_WINDS', when(col('Strong winds') == 1, "Yes").otherwise('No'))\
            .withColumn('ATMOSPH_UNKNOWN', when(col('Not known') == 1, "Yes").otherwise('No'))\
            .drop('Clear', 'Raining','Snowing','Fog','Smoke','Dust','Strong winds', 'Not known')\
            .sort("ACCIDENT_NO")\
            .cache()

In [None]:
# check number of records & number of columns after transformation
ndf_atmospheric_cond_count = ndf_atmospheric_cond.count()
ndf_atmospheric_cond_len = len(ndf_atmospheric_cond.columns)
print("Number of Records: ", ndf_atmospheric_cond_count, " Number of columns: ", ndf_atmospheric_cond_len)

In [None]:
# check result
ndf_atmospheric_cond.show(2,vertical=True,truncate=False)

<a class="anchor" id="transform_surface_cond"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> surface_cond</strong> </div>

**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
- Remove Surface_Condition Columns and Pivot Sequence data to give all values for an accident on a single row as per metadata information 
- Sort by `ACCIDENT_NO` to allow any joining operation later

In [None]:
# One accident has multiple surface conditions
# Pivot the table from long to wide
ndf_surface_cond = df_surface_cond.groupBy('ACCIDENT_NO').pivot('SURFACE_COND_DESC').count()\
            .withColumn('SURFACE_DRY', when(col('Dry') == 1, "Yes").otherwise('No'))\
            .withColumn('SURFACE_WET', when(col('Wet') == 1, "Yes").otherwise('No'))\
            .withColumn('SURFACE_MUDDY', when(col('Muddy') == 1, "Yes").otherwise('No'))\
            .withColumn('SURFACE_SNOWY', when(col('Snowy') == 1, "Yes").otherwise('No'))\
            .withColumn('SURFACE_ICY', when(col('Icy') == 1, "Yes").otherwise('No'))\
            .withColumn('SURFACE_UNKNOWN', when(col('Unknown') == 1, "Yes").otherwise('No'))\
            .drop('Dry', 'Wet','Muddy','Snowy','Icy','Unknown').sort("ACCIDENT_NO")\
            .cache()

In [None]:
# check number of records & number of columns after transformation
ndf_surface_cond_count = ndf_surface_cond.count()
ndf_surface_cond_len = len(ndf_surface_cond.columns)
print("Number of Records: ", ndf_surface_cond_count, " Number of columns: ", ndf_surface_cond_len)

In [None]:
# check result
ndf_surface_cond.show(2,vertical=True,truncate=False)

<a class="anchor" id="transform_person"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> person </strong> </div>

**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
* remove/trim character space for all records
* fill missing values of `AGE` with 'average_age'
* fill missing values of `INJ_LEVEL` with 'mode_injury_level'
* `AGE_GROUP` - reclassify age group to sensible age range
* `LICENCE_STATE`, - reclassify state to smaller ranges, fill missing value  with "Not known" and others with meaningful description
* `SEX` - fill in blank values with "unknown" and others with meaningful description   
* `HELMET_BELT_WORN` - fill missing value  with "Not known" and others with meaningful description
* `SEATING_POSITION` - fill missing value  with "Not known" and others with meaningful description 
* `EJECTED_CODE` - fill missing value  with "Not known" and others with meaningful description
* `TAKEN_HOSPITAL` - fill missing value  with "Not known" and others with meaningful description
* `Inj Level Desc` - fill missing value  with "Not known" and others with meaningful description 
* `PEDEST_MOVEMENT` - fill missing value  with "Not known" and others with meaningful description 
* Select only columns requires for next part of processing

In [None]:
# Set mode value for INJ_LEVEL (manage missing values)
mode_inj_lvl = df_person.groupBy("INJ_LEVEL").agg(F.count(col("INJ_LEVEL")).alias('count_INJ_LEVEL')).sort(col("INJ_LEVEL"), ascending=False).first()['INJ_LEVEL']
print(mode_inj_lvl)

In [None]:
# Set average age (manage missing values)
# note, the average age by gender is between 37 or 38 for Female and Male respectively, where it belongs to the same Age_Group. 
# For simplicity, average_age is used.
avg_age = df_person.agg(F.round(F.avg(col("AGE"))).cast('integer').alias('average_age')).first()['average_age']
print(avg_age)

In [None]:
# perform all data cleaning and transformation as specify above
ndf_person = df_person.select([trim(col(c)).alias(c) for c in df_person.columns])\
                    .withColumn("AGE",
                         when(col('AGE').isNull(), avg_age)\
                        .otherwise(col('AGE')))\
                    .withColumn("INJ_LEVEL",
                         when(col("INJ_LEVEL").isNull(), mode_inj_lvl)\
                        .otherwise(col("INJ_LEVEL")))\
                    .withColumn("AGE_GROUP",
                            when(col("AGE").between(0,4), "0-4")\
                           .when(col("AGE").between(5,17), "5-17")\
                           .when(col("AGE").between(18,24), "18-24")\
                           .when(col("AGE").between(25,34), "25-34")\
                           .when(col("AGE").between(35,44), "35-44")\
                           .when(col("AGE").between(45,54), "45-54")\
                           .when(col("AGE").between(55,64), "55-64")\
                           .when(col("AGE").between(65,69), "65-69")\
                           .when(col("AGE") > 69, "70+")\
                           .otherwise(col("AGE")))\
                    .withColumn("LICENCE_STATE",
                         when(col("LICENCE_STATE").isin("A","D","N","Q","S","T","W"), "Interstate")\
                        .when(col("LICENCE_STATE") == "B", "Commonwealth")\
                        .when(col("LICENCE_STATE") == "O", "Overseas")\
                        .when(col("LICENCE_STATE") == "V", "Victoria")\
                        .otherwise("Not known"))\
                    .withColumn("SEX", 
                         when(col('SEX') == 'F',"Female")\
                        .when(col('SEX') == 'M',"Male")\
                        .otherwise("Not known"))\
                    .withColumn("HELMET_BELT_WORN_DESC", 
                         when(col('HELMET_BELT_WORN') == 1,"Seatbelt worn")\
                        .when(col('HELMET_BELT_WORN') == 2,"Seatbelt not worn")\
                        .when(col('HELMET_BELT_WORN') == 3,"Child restraint worn")\
                        .when(col('HELMET_BELT_WORN') == 4,"Child restraint not worn")\
                        .when(col('HELMET_BELT_WORN') == 5,"Seatbelt/restraint not fitted")\
                        .when(col('HELMET_BELT_WORN') == 6,"Crash helmet worn")\
                        .when(col('HELMET_BELT_WORN') == 7,"Crash helmet not worn")\
                        .when(col('HELMET_BELT_WORN') == 8,"Not appropriate")\
                        .otherwise("Not known"))\
                    .withColumn("SEATING_POSITION_DESC", 
                         when(col('SEATING_POSITION') == 'CF',"Centre-front")\
                        .when(col('SEATING_POSITION') == 'CR',"Centre-rear")\
                        .when(col('SEATING_POSITION') == 'D' ,"Driver or rider")\
                        .when(col('SEATING_POSITION') == 'LF',"Left-front")\
                        .when(col('SEATING_POSITION') == 'LR',"Left-rear")\
                        .when(col('SEATING_POSITION') == 'NA',"Not applicable")\
                        .when(col('SEATING_POSITION') == 'NK',"Not known")\
                        .when(col('SEATING_POSITION') == 'OR',"Other-rear")\
                        .when(col('SEATING_POSITION') == 'PL',"Pillion passenger")\
                        .when(col('SEATING_POSITION') == 'PS',"Motor-cycle side car passenger")\
                        .when(col('SEATING_POSITION') == 'RR',"Right-rear")\
                        .otherwise("Not known"))\
                    .withColumn("EJECTED_CODE", 
                         when(col("EJECTED_CODE") == "0", "Not applicable")\
                        .when(col("EJECTED_CODE") == "1", "Total ejected")\
                        .when(col("EJECTED_CODE") == "2", "Partially ejected")\
                        .when(col("EJECTED_CODE") == "3", "Partial ejection involving extraction")\
                        .otherwise("Not known"))\
                    .withColumn("TAKEN_HOSPITAL",
                         when(col("TAKEN_HOSPITAL") == "Y","Yes")\
                        .when(col("TAKEN_HOSPITAL") == "N", "No")\
                        .otherwise("Not known"))\
                    .withColumn("INJ_LEVEL_DESC", 
                         when(col('INJ_LEVEL') == "4", "4-Not Injured")\
                        .when(col('INJ_LEVEL') == "3", "3-Other Injury")\
                        .when(col('INJ_LEVEL') == "2", "2-Serious Injury")\
                        .when(col('INJ_LEVEL') == "1", "1-Fatality")\
                        .otherwise(col("INJ_LEVEL_DESC")))\
                    .withColumn("PEDEST_MOVEMENT_DESC",
                         when(col("PEDEST_MOVEMENT") == "0", "Not applicable")\
                        .when(col("PEDEST_MOVEMENT") == "1", "Crossing carriageway")\
                        .when(col("PEDEST_MOVEMENT") == "2", "Working/playing/lying or standing on carriageway")\
                        .when(col("PEDEST_MOVEMENT") == "3", "Walking on carriageway with traffic")\
                        .when(col("PEDEST_MOVEMENT") == "4", "Walking on carriageway against traffic")\
                        .when(col("PEDEST_MOVEMENT") == "5", "Pushing or working on vehicle")\
                        .when(col("PEDEST_MOVEMENT") == "6", "Walking to/from or boarding tram")\
                        .when(col("PEDEST_MOVEMENT") == "7", "Walking to/from or boarding other vehicle")\
                        .when(col("PEDEST_MOVEMENT") == "8", "Not on carriageway")\
                        .otherwise("Not known"))\
                    .select("ACCIDENT_NO","PERSON_ID", "VEHICLE_ID","SEX","AGE_GROUP","LICENCE_STATE",
                            "INJ_LEVEL_DESC","TAKEN_HOSPITAL","EJECTED_CODE",
                            "ROAD_USER_TYPE_DESC","SEATING_POSITION_DESC","HELMET_BELT_WORN_DESC",
                            "PEDEST_MOVEMENT_DESC")\
                    .cache()


In [None]:
# check number of records & number of columns after transformation
ndf_person_count = ndf_person.count()
ndf_person_len = len(ndf_person.columns)
print("Number of Records: ", ndf_person_count, " Number of columns: ", ndf_person_len)

In [None]:
# check result
ndf_person.show(2,vertical=True,truncate=False)

<a class="anchor" id="transform_node"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> node </strong> </div>

**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**  
* `LGA_NAME_ALL`, `LGA_NAME`, `REGION NAME`, `DEG_URBAN_NAME` - fill empty values and aligning the standard description.
* Remove duplicate entries where `ACCIDENT_NO` and `NODE_ID` are the same.
* Removing duplicates have also removed the missing values entries.
* Update `NODE_TYPE`  with more descriptive labels.
* Select only columns requires for next part of processing

In [None]:
# remove special character for LGA_NAME
ndf_node_temp1 = df_node.withColumn("LGA_NAME", F.regexp_replace(col("LGA_NAME"), "[\(.*\)]", ""))\
    .withColumn("LGA_NAME", F.regexp_replace(col("LGA_NAME_ALL"), "[\(.*\)]", ""))

In [None]:
# Create Views from Dataframes
ndf_node_temp1.createOrReplaceTempView("sql_node")
ndf_node_temp1.createOrReplaceTempView("sql_node_temp")

#### update the description of REGION_NAME, LGA_NAME, LGA_NAME_ALL based on postcode
# assume the most entered data are the correct values of mapping
ndf_node_temp2 = spark.sql('''
    SELECT n.ACCIDENT_NO, n.NODE_ID, n.NODE_TYPE, n.VICGRID94_X, n.VICGRID94_Y,
            p.LGA_NAME, p.LGA_NAME_ALL, p.REGION_NAME, p.DEG_URBAN_NAME, 
            n.Lat, n.Long, n.POSTCODE_NO
    FROM sql_node_temp n INNER JOIN 
        (SELECT POSTCODE_NO, REGION_NAME, LGA_NAME, LGA_NAME_ALL, DEG_URBAN_NAME
        FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY POSTCODE_NO ORDER BY Rank) as row_num
            FROM  (SELECT POSTCODE_NO, REGION_NAME, LGA_NAME, LGA_NAME_ALL, DEG_URBAN_NAME,
                    count(*), 
                    rank() OVER(PARTITION BY POSTCODE_NO ORDER BY count(*) DESC) as Rank                    
                  FROM sql_node
                  GROUP BY POSTCODE_NO, REGION_NAME, LGA_NAME, LGA_NAME_ALL, DEG_URBAN_NAME
                  ORDER BY POSTCODE_NO)
            WHERE Rank = 1)
    WHERE row_num = 1) p
    ON n.POSTCODE_NO =  p.POSTCODE_NO
''')


In [None]:
# check the number of records before & after description is updated
# the number of records should be the same
print("Number of records - BEFORE: ", df_shape_initial.filter((col('DATAFRAME') == "df_node") & (col("SHAPE") == "Number of Records")).first()["INITIAL RESULTS"], "| AFTER: ", ndf_node_temp2.count())

In [None]:
# check the result of translation
ndf_node_temp2.show(2,vertical=True,truncate=False)

In [None]:
# remove duplicate rows assuming the first row is the correct entry
# Add description for Node_Type
ndf_node = ndf_node_temp2.dropDuplicates(["NODE_ID","ACCIDENT_NO"])\
                    .withColumn("NODE_TYPE_DESC", 
                                when(col('NODE_TYPE') == 'I',"Intersection")\
                                .when(col('NODE_TYPE') == 'N',"Non-intersection")\
                                .when(col('NODE_TYPE') == 'O',"Off-road")\
                                .when(col('NODE_TYPE') == 'U',"Unknown")\
                                .otherwise("Unknown"))\
                    .select("ACCIDENT_NO","NODE_ID","NODE_TYPE_DESC","LGA_NAME","REGION_NAME","DEG_URBAN_NAME")\
                    .cache()

In [None]:
# check number of records & number of columns after transformation
ndf_node_count = ndf_node.count()
ndf_node_len = len(ndf_node.columns)
print("Number of Records: ", ndf_node_count, " Number of columns: ", ndf_node_len)

In [None]:
# check result
ndf_node.show(2,vertical=True,truncate=False)

In [None]:
# check duplicate records after transformation
ndf_node.groupby("NODE_ID","ACCIDENT_NO").agg(count(col("ACCIDENT_NO")).alias('num_of_accidents')).sort(col("NODE_ID"))\
        .filter(col('num_of_accidents') > 1).show()

<a class="anchor" id="transform_accident"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> accident</strong> </div>

**<font color='blue'> _CLEANING & TRANSFORMATION ACTIVITIES_ </font>**   
- Remove spaces from all fields for all columns  
- Update `POLICE_ATTEND` to be more prescritive, based on meta-data's reference.  
- Add `SEVERITY_DESC` to presribe `SEVERITY`.   
- Add `SPEED_ZONE_DESC` to presribe `SPEED_ZONE`.  
- Create `ACCIDENT_HOUR_GROUP` based on `ACCIDENTTIME` (MORNING PEAK (7am-9am), EVENING PEAK (4pm-7pm), OFF-PEAK). 
- Select only columns requires for next part of processing 

In [None]:
# perform above cleaning and transformationa activities
ndf_accident = df_accident.select([trim(col(c)).alias(c) for c in df_accident.columns])\
                        .withColumn("POLICE_ATTEND", 
                             when(col('POLICE_ATTEND') == 1,"Yes")\
                            .when(col('POLICE_ATTEND') == 2,"No")\
                            .when(col('POLICE_ATTEND') == 9,"Not known")\
                            .otherwise("Not known"))\
                        .withColumn("SEVERITY_DESC", 
                             when(col('SEVERITY') == 1,"1-Fatal accident")\
                            .when(col('SEVERITY') == 2,"2-Serious injury accident")\
                            .when(col('SEVERITY') == 3,"3-Other injury accident")\
                            .when(col('SEVERITY') == 4,"4-Non injury accident")\
                            .otherwise("4 - Non injury accident"))\
                        .withColumn("SPEED_ZONE_DESC", 
                             when(col('SPEED_ZONE') == '030',"30-40 km/hr")\
                            .when(col('SPEED_ZONE') == '040',"30-40 km/hr")\
                            .when(col('SPEED_ZONE') == '050',"50 km/hr")\
                            .when(col('SPEED_ZONE') == '060',"60 km/hr")\
                            .when(col('SPEED_ZONE') == '070',"70-75 km/hr")\
                            .when(col('SPEED_ZONE') == '075',"70-75 km/hr")\
                            .when(col('SPEED_ZONE') == '080',"80-90 km/hr")\
                            .when(col('SPEED_ZONE') == '090',"80-90 km/hr")\
                            .when(col('SPEED_ZONE') == '100',"100-110 km/hr")\
                            .when(col('SPEED_ZONE') == '110',"100-110 km/hr")\
                            .when(col('SPEED_ZONE') == '777',"Other/Not Known")\
                            .when(col('SPEED_ZONE') == '888',"Other/Not Known")\
                            .when(col('SPEED_ZONE') == '999',"Other/Not Known")\
                            .otherwise("Other/Not Known"))\
                        .withColumn("ACCIDENT_HOUR", substring(col("ACCIDENTTIME"),1,2).cast(IntegerType()))\
                        .withColumn("ACCIDENT_MONTH", regexp_extract(col("ACCIDENTDATE"),r'\/(\d*)\/',1).cast(IntegerType()))\
                        .withColumn("ACCIDENT_HOUR_GROUP", 
                                    when((( col("ACCIDENT_HOUR") >= 7) & (col("ACCIDENT_HOUR") < 9)),"MORNING PEAK")\
                                    .when((( col("ACCIDENT_HOUR") >= 16) & (col("ACCIDENT_HOUR") < 19)),"EVENING PEAK")\
                                    .otherwise("OFF-PEAK"))\
                        .select("ACCIDENT_NO","ACCIDENTDATE","ACCIDENT_MONTH","ACCIDENT_HOUR_GROUP","DAY_OF_WEEK_DESC",
                                "SEVERITY_DESC", "ACCIDENT_TYPE_DESC","DCA_DESC","LIGHT_CONDITION_DESC","ROAD_GEOMETRY_DESC",
                                "POLICE_ATTEND","SPEED_ZONE_DESC","NODE_ID")\
                        .cache()



In [None]:
# check number of records & number of columns after transformation
ndf_accident_count = ndf_accident.count()
ndf_accident_len = len(ndf_accident.columns)
print("Number of Records: ", ndf_accident_count, " Number of columns: ", ndf_accident_len)

In [None]:
# check result
ndf_accident.show(2,vertical=True,truncate=False)

<a class="anchor" id="transform_vehicle"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:10px"><strong> Dataset: </strong> <strongstyle="color:blue"> vehicle </strong> </div>

- Provide descriptive data for matched code fields where available from https://data.vicroads.vic.gov.au/Metadata/Crash%20Stats%20-%20Data%20Extract%20-%20Open%20Data.html
    - `VEHICLE_DCA_CODE`, `LAMPS`, `INITIAL_IMPACT`, `CAUGHT_FIRE`, `FUEL_TYPE`, 
    - `LEVEL_OF_DAMAGE`, `TRAILER_TYPE`, `DRIVER_INTENT`,`VEHICLE_MOVEMENT`
- Create `VEHICLE_CHANGED_DIRECTION` from the INITIAL_DIRECTION/FINAL_DIRECTION columns
- Group `REG_STATE` into `Victoria` or `Other`
- Created a grouped `VEHICLE_YEAR_MANUF` for vehicles <5 Yeas old, 5-10 years old, 10-20 Years old, 20-30 Years old, 30+Years old
- Correct Vehicle Make values to be a  most common value where it is obvious (assumed) what the make should be
- Reorder to align codes with description columns
- Rename columns ( remove spaces and capitalise)
- Calculate `VEHICLE_CHANGED_DIRECTION`
- Drop Columns we will not use for analysis/modelling due to lack of completeness in data set
    - INITIAL_DIRECTION, TOWED_AWAY_FLAG, VEHICLE_MODEL, CONSTRUCTION_TYPE, VEHICLE_WEIGHT, TARE_WEIGHT, CARRY_CAPACITY, CUBIC_CAPCITY, FINAL_DIRECTION
- Sort data by `ACCIDENT_NO`


In [None]:
# Cleanup as per above notes

ndf_vehicle = df_vehicle.withColumn("REG_STATE_GRP", 
                                    when(col('REG_STATE').isin("A","D","N","Q","S","T","W"), "Interstate")\
                                    .when(col('REG_STATE') == "B", "Commonwealth")\
                                    .when(col('REG_STATE') == "O","Overseas")\
                                    .when(col('REG_STATE') == "V","Victoria")\
                                    .when(col('REG_STATE') == "Z","Not known")\
                                    .otherwise("Not known"))\
                        .withColumn("VEHICLE_DCA_DESC", 
                                 when(col('VEHICLE_DCA_CODE') == 1,"Vehicle 1")\
                                .when(col('VEHICLE_DCA_CODE') == 2,"Vehicle 2")\
                                .when(col('VEHICLE_DCA_CODE') == 3,"Not known which vehicle was number 1")\
                                .when(col('VEHICLE_DCA_CODE') == 8,"Not involved in initial event")\
                                .otherwise("Not known"))\
                        .withColumn("VEHICLE_YEAR_MANUF", when(col('VEHICLE_YEAR_MANUF') == 0,"Not known")\
                                    .when(col('VEHICLE_YEAR_MANUF') > 2020 ,"Not known")\
                                   .otherwise(df_vehicle["VEHICLE_YEAR_MANUF"]))\
                        .withColumn("LAMPS_ON", 
                                     when(col('LAMPS') == 0,"Not Applicable")\
                                    .when(col("LAMPS") == 1,"Yes")\
                                    .when(col("LAMPS") == 2,"No")\
                                    .otherwise("Not known"))\
                        .withColumn("INITIAL_IMPACT_DESC", 
                                     when(col('INITIAL_IMPACT') == 0,"Towed Unit")\
                                    .when(col("INITIAL_IMPACT") == 1,"Right front corner")\
                                    .when(col("INITIAL_IMPACT") == 2,"Right side forwards")\
                                    .when(col("INITIAL_IMPACT") == 3,"Right side rearwards")\
                                    .when(col("INITIAL_IMPACT") == 4,"Right rear corner")\
                                    .when(col("INITIAL_IMPACT") == 6,"Left front corner")\
                                    .when(col("INITIAL_IMPACT") == 6,"Left side forwards")\
                                    .when(col("INITIAL_IMPACT") == 7,"Left side rearwards")\
                                    .when(col("INITIAL_IMPACT") == 8,"Left rear corner")\
                                    .when(col("INITIAL_IMPACT") == 9,"Not Known/Not Applicable")\
                                    .when(col("INITIAL_IMPACT") == "F","Front")\
                                    .when(col("INITIAL_IMPACT") == "N","None")\
                                    .when(col("INITIAL_IMPACT") == "R","Sidecar")\
                                    .when(col("INITIAL_IMPACT") == "T","Top/roof")\
                                    .when(col("INITIAL_IMPACT") == "U","Undercarriage")\
                                    .otherwise("Not known"))\
                        .withColumn("CAUGHT_FIRE", 
                                     when(col('CAUGHT_FIRE') == 0,"Not Applicable")\
                                    .when(col("CAUGHT_FIRE") == 1,"Yes")\
                                    .when(col("CAUGHT_FIRE") == 2,"No")\
                                    .otherwise("Not known"))\
                        .withColumn("FUEL_TYPE_DESC", 
                                     when(col("FUEL_TYPE") == "D","Diesel")\
                                    .when(col("FUEL_TYPE") == "E", "Electric")\
                                    .when(col("FUEL_TYPE") == "G", "Gas")\
                                    .when(col("FUEL_TYPE") == "M", "Multi")\
                                    .when(col("FUEL_TYPE") == "P", "Petrol")\
                                    .when(col("FUEL_TYPE") == "R", "Rotary")\
                                    .otherwise("Unknown"))\
                        .withColumn("LEVEL_OF_DAMAGE_DESC", 
                                     when(col("LEVEL_OF_DAMAGE") == 1,"Minor")\
                                    .when(col("LEVEL_OF_DAMAGE") == 2,"Moderate (driveable vehicle)")\
                                    .when(col("LEVEL_OF_DAMAGE") == 3,"Moderate (unit towed away)")\
                                    .when(col("LEVEL_OF_DAMAGE") == 4,"Major (unit towed away)")\
                                    .when(col("LEVEL_OF_DAMAGE") == 5,"Extensive (unrepairable)")\
                                    .when(col("LEVEL_OF_DAMAGE") == 6,"Nil damage")\
                                    .otherwise("Not known"))\
                        .withColumn("TRAILER_TYPE_DESC", 
                                     when(col("TRAILER_TYPE") == "A","Caravan")\
                                    .when(col("TRAILER_TYPE") == "B","Trailer (general)")\
                                    .when(col("TRAILER_TYPE") == "C","Trailer (boat)")\
                                    .when(col("TRAILER_TYPE") == "D","Horse float")\
                                    .when(col("TRAILER_TYPE") == "E","Machinery")\
                                    .when(col("TRAILER_TYPE") == "F","Farm/agricultural equipment")\
                                    .when(col("TRAILER_TYPE") == "G","Not known what is being towed")\
                                    .when(col("TRAILER_TYPE") == "H","Not Applicable")\
                                    .when(col("TRAILER_TYPE") == "I","Trailer (exempt)")\
                                    .when(col("TRAILER_TYPE") == "J","Semi Trailer")\
                                    .when(col("TRAILER_TYPE") == "K","Pig Trailer")\
                                    .when(col("TRAILER_TYPE") == "L","Dog Trailer")\
                                    .otherwise("Not known"))\
                        .withColumn("DRIVER_INTENT_DESC", 
                                     when(col("DRIVER_INTENT") == "01","Going straight ahead")\
                                    .when(col("DRIVER_INTENT") == "02","Turning right")\
                                    .when(col("DRIVER_INTENT") == "03","Turning left")\
                                    .when(col("DRIVER_INTENT") == "04","Leaving a driveway")\
                                    .when(col("DRIVER_INTENT") == "05","U turning ")\
                                    .when(col("DRIVER_INTENT") == "06","Changing lanes")\
                                    .when(col("DRIVER_INTENT") == "07","Overtaking")\
                                    .when(col("DRIVER_INTENT") == "08","Merging")\
                                    .when(col("DRIVER_INTENT") == "09","Reversing")\
                                    .when(col("DRIVER_INTENT") == "10","Parking or unparking")\
                                    .when(col("DRIVER_INTENT") == "11","Parked legally")\
                                    .when(col("DRIVER_INTENT") == "12","Parked illegally")\
                                    .when(col("DRIVER_INTENT") == "13","Stationary accident")\
                                    .when(col("DRIVER_INTENT") == "14","Stationary broken down")\
                                    .when(col("DRIVER_INTENT") == "15","Other stationary")\
                                    .when(col("DRIVER_INTENT") == "16","Avoiding animals")\
                                    .when(col("DRIVER_INTENT") == "17","Slow/stopping")\
                                    .when(col("DRIVER_INTENT") == "18","Out of control")\
                                    .when(col("DRIVER_INTENT") == "19","Wrong way")\
                                    .when(col("DRIVER_INTENT") == "99","Not known")\
                                    .otherwise("Not known"))\
                        .withColumn("VEHICLE_MOVEMENT_DESC", 
                                     when(col("VEHICLE_MOVEMENT") == "01","Going straight ahead")\
                                    .when(col("VEHICLE_MOVEMENT") == "02","Turning right")\
                                    .when(col("VEHICLE_MOVEMENT") == "03","Turning left")\
                                    .when(col("VEHICLE_MOVEMENT") == "04","Leaving a driveway")\
                                    .when(col("VEHICLE_MOVEMENT") == "05","U turning ")\
                                    .when(col("VEHICLE_MOVEMENT") == "06","Changing lanes")\
                                    .when(col("VEHICLE_MOVEMENT") == "07","Overtaking")\
                                    .when(col("VEHICLE_MOVEMENT") == "08","Merging")\
                                    .when(col("VEHICLE_MOVEMENT") == "09","Reversing")\
                                    .when(col("VEHICLE_MOVEMENT") == "10","Parking or unparking")\
                                    .when(col("VEHICLE_MOVEMENT") == "11","Parked legally")\
                                    .when(col("VEHICLE_MOVEMENT") == "12","Parked illegally")\
                                    .when(col("VEHICLE_MOVEMENT") == "13","Stationary accident")\
                                    .when(col("VEHICLE_MOVEMENT") == "14","Stationary broken down")\
                                    .when(col("VEHICLE_MOVEMENT") == "15","Other stationary")\
                                    .when(col("VEHICLE_MOVEMENT") == "16","Avoiding animals")\
                                    .when(col("VEHICLE_MOVEMENT") == "17","Slow/stopping")\
                                    .when(col("VEHICLE_MOVEMENT") == "18","Out of control")\
                                    .when(col("VEHICLE_MOVEMENT") == "19","Wrong way")\
                                    .when(col("VEHICLE_MOVEMENT") == "99","Not known")\
                                    .otherwise("Not known"))\
                        .withColumn("VEHICLE_MAKE", 
                                     when(col("VEHICLE_MAKE") == "B.M.W.","B M W")\
                                    .when(col("VEHICLE_MAKE") == "B.M.","B M W")\
                                    .when(col("VEHICLE_MAKE") == "CATERP","CATPLR")\
                                    .when(col("VEHICLE_MAKE") == "CF MOT","CFMO")\
                                    .when(col("VEHICLE_MAKE") == "CHEVRO","CHEV")\
                                    .when(col("VEHICLE_MAKE") == "CHRYSL","CHRYS")\
                                    .when(col("VEHICLE_MAKE") == "CITROE","CITRN")\
                                    .when(col("VEHICLE_MAKE") == "H DA","H DAV")\
                                    .when(col("VEHICLE_MAKE") == "HSQV","HSQVRN")\
                                    .when(col("VEHICLE_MAKE") == "HYND","HYNDAI")\
                                    .when(col("VEHICLE_MAKE") == "HYU","HYNDAI")\
                                    .when(col("VEHICLE_MAKE") == "HYUNDA","HYNDAI")\
                                    .when(col("VEHICLE_MAKE") == "HYUNDI","HYNDAI")\
                                    .when(col("VEHICLE_MAKE") == "KAWS","KAWASA")\
                                    .when(col("VEHICLE_MAKE") == "KAWASK","KAWASA")\
                                    .when(col("VEHICLE_MAKE") == "KENWOR","KENWTH")\
                                    .when(col("VEHICLE_MAKE") == "L RO","L ROV")\
                                    .when(col("VEHICLE_MAKE") == "LROVER","L ROV")\
                                    .when(col("VEHICLE_MAKE") == "MER","MERC B")\
                                    .when(col("VEHICLE_MAKE") == "MERCBZ","MERC B")\
                                    .when(col("VEHICLE_MAKE") == "MERCED","MERC B")\
                                    .when(col("VEHICLE_MAKE") == "MISTUB","MITSUB")\
                                    .when(col("VEHICLE_MAKE") == "NIS","NISSAN")\
                                    .when(col("VEHICLE_MAKE") == "PEU","PEUG")\
                                    .when(col("VEHICLE_MAKE") == "PEUGEO","PEUG")\
                                    .when(col("VEHICLE_MAKE") == "PGO","PEUG")\
                                    .when(col("VEHICLE_MAKE") == "RENAUL","REN")\
                                    .when(col("VEHICLE_MAKE") == "TRIUMP","TRIUM")\
                                    .when(col("VEHICLE_MAKE") == "UNK","UNKN")\
                                    .when(col("VEHICLE_MAKE") == "UNKW","UNKN")\
                                    .when(col("VEHICLE_MAKE") == "UNSPEC","UNKN")\
                                    .when(col("VEHICLE_MAKE") == "VOLKSW","VOLKS")\
                                    .when(col("VEHICLE_MAKE") == "VW","VOLKS")
                                    .otherwise(col("VEHICLE_MAKE")))\
                    .withColumn("VM_IND", when(col("VEHICLE_MAKE") == 'UNKN  ',0).otherwise(1))\
                    .withColumn("VM_IND", when(col("VEHICLE_MAKE") == '      ',0).otherwise(1))\
                    .withColumn("VMCNT",F.sum("VM_IND").over(Window.partitionBy("VEHICLE_MAKE")))\
                    .withColumn("VMRANK", F.dense_rank().over(Window.partitionBy().orderBy(desc("VMCNT"))))\
                    .withColumn("VEHICLE_MAKE", when(col("VMRANK") <= 15 ,col("VEHICLE_MAKE"))\
                                .otherwise("OTHER MAKE"))\
                    .withColumn("VEHICLE_CHANGED_DIRECTION", when(col("INITIAL_DIRECTION") == col("FINAL_DIRECTION"),"No")\
                                .when(col("INITIAL_DIRECTION") == 'NK',"Not known")\
                                .when(col("FINAL_DIRECTION") == 'NK',"Not known")\
                                     .otherwise("Yes"))\
                    .withColumn("VEHICLE_COLOUR_1",when(col("VEHICLE_COLOUR_1") == "BLK", "Black")\
                            .when(col("VEHICLE_COLOUR_1") == "BLU", "Blue")\
                            .when(col("VEHICLE_COLOUR_1") == "BRN", "Brown")\
                            .when(col("VEHICLE_COLOUR_1") == "CRM", "Cream")\
                            .when(col("VEHICLE_COLOUR_1") == "FWN", "Fawn")\
                            .when(col("VEHICLE_COLOUR_1") == "GLD", "Gold")\
                            .when(col("VEHICLE_COLOUR_1") == "GRN", "Green")\
                            .when(col("VEHICLE_COLOUR_1") == "GRY", "Grey")\
                            .when(col("VEHICLE_COLOUR_1") == "MRN", "Maroon")\
                            .when(col("VEHICLE_COLOUR_1") == "MVE", "Mauve")\
                            .when(col("VEHICLE_COLOUR_1") == "OGE", "Orange")\
                            .when(col("VEHICLE_COLOUR_1") == "PNK", "Pink")\
                            .when(col("VEHICLE_COLOUR_1") == "PUR", "Purple")\
                            .when(col("VEHICLE_COLOUR_1") == "RED", "Red")\
                            .when(col("VEHICLE_COLOUR_1") == "SIL", "Silver")\
                            .when(col("VEHICLE_COLOUR_1") == "WHI", "White")\
                            .when(col("VEHICLE_COLOUR_1") == "YLW", "Yellow")\
                            .when(col("VEHICLE_COLOUR_1") == "ZZ", "Unknown or N/A")\
                            .otherwise("Unknown or N/A"))\
                    .withColumn("VEHICLE_COLOUR_2",when(col("VEHICLE_COLOUR_2") == "BLK", "Black")\
                            .when(col("VEHICLE_COLOUR_2") == "BLU", "Blue")\
                            .when(col("VEHICLE_COLOUR_2") == "BRN", "Brown")\
                            .when(col("VEHICLE_COLOUR_2") == "CRM", "Cream")\
                            .when(col("VEHICLE_COLOUR_2") == "FWN", "Fawn")\
                            .when(col("VEHICLE_COLOUR_2") == "GLD", "Gold")\
                            .when(col("VEHICLE_COLOUR_2") == "GRN", "Green")\
                            .when(col("VEHICLE_COLOUR_2") == "GRY", "Grey")\
                            .when(col("VEHICLE_COLOUR_2") == "MRN", "Maroon")\
                            .when(col("VEHICLE_COLOUR_2") == "MVE", "Mauve")\
                            .when(col("VEHICLE_COLOUR_2") == "OGE", "Orange")\
                            .when(col("VEHICLE_COLOUR_2") == "PNK", "Pink")\
                            .when(col("VEHICLE_COLOUR_2") == "PUR", "Purple")\
                            .when(col("VEHICLE_COLOUR_2") == "RED", "Red")\
                            .when(col("VEHICLE_COLOUR_2") == "SIL", "Silver")\
                            .when(col("VEHICLE_COLOUR_2") == "WHI", "White")\
                            .when(col("VEHICLE_COLOUR_2") == "YLW", "Yellow")\
                            .when(col("VEHICLE_COLOUR_2") == "ZZ", "Unknown or N/A")\
                            .otherwise("Unknown or N/A"))\
                    .sort("ACCIDENT_NO")\
                    .select("ACCIDENT_NO","VEHICLE_ID","VEHICLE_YEAR_MANUF", "VEHICLE_DCA_CODE", "VEHICLE_DCA_DESC", 
                            "ROAD_SURFACE_TYPE","ROAD_SURFACE_TYPE_DESC","REG_STATE_GRP",
                            "VEHICLE_BODY_STYLE","VEHICLE_MAKE","VEHICLE_MODEL","VEHICLE_TYPE",
                            "VEHICLE_TYPE_DESC","FUEL_TYPE","FUEL_TYPE_DESC",
                            "TOTAL_NO_OCCUPANTS","DRIVER_INTENT", "DRIVER_INTENT_DESC",
                            "VEHICLE_MOVEMENT","VEHICLE_MOVEMENT_DESC","TRAILER_TYPE","TRAILER_TYPE_DESC",
                            "VEHICLE_COLOUR_1","VEHICLE_COLOUR_2","CAUGHT_FIRE","INITIAL_IMPACT","INITIAL_IMPACT_DESC",
                            "LAMPS","LAMPS_ON","LEVEL_OF_DAMAGE","LEVEL_OF_DAMAGE_DESC","OWNER_POSTCODE",
                            "TRAFFIC_CONTROL","TRAFFIC_CONTROL_DESC","VEHICLE_CHANGED_DIRECTION")\
                    .cache()


In [None]:
# check number of records & number of columns after transformation
ndf_vehicle_count = ndf_vehicle.count()
ndf_vehicle_len = len(ndf_vehicle.columns)
print("Number of Records: ", ndf_vehicle_count, " Number of columns: ", ndf_vehicle_len)

In [None]:
# check the result
ndf_vehicle.show(2,vertical=True,truncate=False)

In [None]:
df_shape_clean = spark.createDataFrame(
    [
        ("df_accident", "Number of Records", ndf_accident_count ), 
        ("df_accident", "Number of Columns", ndf_accident_len), 
        ("df_atmospheric_cond", "Number of Records", ndf_atmospheric_cond_count), 
        ("df_atmospheric_cond", "Number of Columns", ndf_atmospheric_cond_len), 
        ("df_node", "Number of Records", ndf_node_count ), 
        ("df_node", "Number of Columns", ndf_node_len), 
        ("df_person", "Number of Records", ndf_person_count ), 
        ("df_person", "Number of Columns", ndf_person_len), 
        ("df_surface_cond", "Number of Records", ndf_surface_cond_count ), 
        ("df_surface_cond", "Number of Columns", ndf_surface_cond_len), 
        ("df_vehicle", "Number of Records", ndf_vehicle_count), 
        ("df_vehicle", "Number of Columns", ndf_vehicle_len), 
    ],
    ["DATAFRAME", "SHAPE","TRANSFORM RESULTS"] 
)

In [None]:
# summary
# check the shape of the clean dataframe
df_shape_clean.groupBy("DATAFRAME").pivot("SHAPE").sum().sort("Number of Records","Number of Columns", ascending=False).show()

In [None]:
#Clear Cache for base dataframes
df_accident.unpersist()
df_accident_event.unpersist()
df_atmospheric_cond.unpersist()
df_node.unpersist()
df_node_id.unpersist()
df_person.unpersist()
df_subdca.unpersist()
df_surface_cond.unpersist()
df_vehicle.unpersist()

[Back to Table of Contents](#toc)

<a class="anchor" id="transform_part2"></a>
<div style="background:rgba(0,80,80,0.2);padding:10px;border-radius:4px"><h3>Part 2: Join all the clean dataframes into a joined dataframe </h3>
</div>

To improve the performance, based on the *number of records and matching keys* between dataframes the join operations will be split into different stages:  
1. Join operation stage 1: `ndf_accident` + `ndf_atmospheric_cond` + `ndf_surface_cond` + `ndf_node`
2. Join operation stage 2: `ndf_person` + `ndf_vehicle`
3. Join operation final stage: join result from Step 1 & 2

In [None]:
# check any performance fine-tuning required
# performance fine-tuning in partition is requried if data is skewed
# TURN OFF if no action is required.

# check the partition distribution
#ndf_accident.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().orderBy(asc("count")).show()
#ndf_atmospheric_cond.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().orderBy(asc("count")).show()
#ndf_surface_cond.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().orderBy(asc("count")).show()
#ndf_node.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().orderBy(asc("count")).show()


In [None]:
# Declare aliases 
a = ndf_accident.alias('a')
ac = ndf_atmospheric_cond.alias('ac')
sc = ndf_surface_cond.alias('sc')
n = ndf_node.alias('n')
p = ndf_person.alias('p')
v = ndf_vehicle.alias('v')

In [None]:
# composite keys condition
cond1 = [a.ACCIDENT_NO == n.ACCIDENT_NO, a.NODE_ID == n.NODE_ID] 

## Join Operation 1
# dataset a,ac,sc have the same number of records, inner join is used
# dataset n has less number of records than a,ac,sc
# in dataset a contains invalid NODE_ID,  outer join is used to remove invalid NODE_ID in order to avoid correct accident's records are removed, inner join is NOT used
# node_id is not provided. Fill records with None/0(placeholder for missing node info) when NODE_ID is not provided.

# drop duplicate columns( join keys)
df_crash_stage1 = a.join(ac, a.ACCIDENT_NO == ac.ACCIDENT_NO, how='inner')\
                    .join(sc, a.ACCIDENT_NO == sc.ACCIDENT_NO, how='inner')\
                    .join(n, cond1, how='left')\
                    .drop(ac["ACCIDENT_NO"]).drop(sc["ACCIDENT_NO"]).drop(n["ACCIDENT_NO"]).drop(a["NODE_ID"])\
                    .fillna({"NODE_ID": 0}).fillna('None')\
                    .cache()

In [None]:
#composite keys condition
cond2 = [p.ACCIDENT_NO == v.ACCIDENT_NO, p.VEHICLE_ID == v.VEHICLE_ID] 

# Join Operation 2
# Left operation is used because 
# 1. not all people involved in accidents are in vehicles
# 2. sometime information is not collected by the police department.

df_crash_stage2 = p.join(v, cond2, how='left')\
                    .drop(v["ACCIDENT_NO"]).drop(v["VEHICLE_ID"])\
                    .fillna('N/A').fillna(0)\
                    .cache()

In [None]:
dfc1 = df_crash_stage1.alias('dfc1') # contain infomation about accidents
dfc2 = df_crash_stage2.alias('dfc2') # contain people and vehicle involved in the accidents

# Final stage join operations
#df_crash_temp1 = dfc2.join(broadcast(dfc1), dfc2.ACCIDENT_NO == dfc1.ACCIDENT_NO, how = "inner" )\
df_crash_temp1 = dfc2.join(dfc1, dfc2.ACCIDENT_NO == dfc1.ACCIDENT_NO, how = "inner" )\
        .drop(dfc1["ACCIDENT_NO"])
       # .cache()

In [None]:
#Isolate step for logs
#df_crash_temp1.take(10)

In [None]:
# check result
#df_crash_temp1.limit(3).toPandas()
df_crash_temp1.show(1, vertical=True,truncate=False)

In [None]:
# check number of records & number of columns after transformation
df_crash_stage1_count = df_crash_stage1.count()
df_crash_stage2_count = df_crash_stage2.count()
df_crash_final_count = df_crash_temp1.count()
df_crash_final_len = len(df_crash_temp1.columns)

In [None]:
# Print summary of output: check the number of records before & after join
print("Number of Records")
print("Stage 1    | Target : ", ndf_accident_count ," After join result : ", df_crash_stage1_count)
print("Stage 2    | Target : ", ndf_person_count ," After join result : ", df_crash_stage2_count)
print("Final Stage| Target : ", df_crash_stage2_count," After join result : ", df_crash_final_count)
print(" Number of columns: ", df_crash_final_len)

In [None]:
# check result
df_crash_temp1.limit(1).show(1,vertical=True,truncate=False)

In [None]:
# check invalid NODE_ID -1, -3,-10 whether has been removed
# -1 is used as placeholder for missing node's information
df_crash_stage1.filter(col('NODE_ID').isin('-1','-3','-10')).count()

In [None]:
# Clear cache for cleaned base Dataframes no longer being referred to below:
ndf_accident.unpersist()
ndf_atmospheric_cond.unpersist()
ndf_node.unpersist()
ndf_person.unpersist()
ndf_surface_cond.unpersist()
ndf_vehicle.unpersist()


[Back to Table of Contents](#toc)

<a class="anchor" id="transform_part3"></a>
<div style="background:rgba(0,80,80,0.2);padding:10px;border-radius:4px"><h3>Part 3: Perform exploration, cleaning & transformation on the joined dataframe </h3>
</div>

In [None]:
## Add Vehicle Age - Based on Year of Manufacture from Vehicle, and the Accident date from Accident.
df_crash_temp2 = df_crash_temp1.withColumn("VEHICLE_AGE_YRS", 
                                           when(col("VEHICLE_YEAR_MANUF").isNotNull(), (col("ACCIDENTDATE")[-4:4])-col("VEHICLE_YEAR_MANUF"))\
                                           .otherwise("Unknown"))\
                                .withColumn("VEHICLE_AGE_GROUP", 
                                     when(( col("VEHICLE_AGE_YRS").isNull()),"Unknown")\
                                    .when((( col("VEHICLE_AGE_YRS") >= 0) & (col("VEHICLE_AGE_YRS") <= 3)),"0-3")\
                                    .when((( col("VEHICLE_AGE_YRS") >= 4) & (col("VEHICLE_AGE_YRS") <= 7)),"4-7")\
                                    .when((( col("VEHICLE_AGE_YRS") >= 8) & (col("VEHICLE_AGE_YRS") <= 10)),"8-10")\
                                    .when(( col("VEHICLE_AGE_YRS") >= 11),"11+")\
                                    .otherwise("Unknown"))

In [None]:
# display result
df_crash_temp2.select("ACCIDENTDATE","VEHICLE_YEAR_MANUF","VEHICLE_AGE_YRS","VEHICLE_AGE_GROUP").show(5)

In [None]:
# FORMAT cleaned dataset for exporting

# rename the description and select relevant attributes
## remove key id
## rename attributes name without "DESC"

# re-organise the columns sequence
## Records ID
## Classification field
## ACCIDENT + ROAD SURFACE + ATMOSPHERIC
## PERSON
## NODE
## VEHICLE

df_crash_final = df_crash_temp2.select("ACCIDENT_NO","PERSON_ID", "NODE_ID", "VEHICLE_ID",
                                       col("SEVERITY_DESC").alias("SEVERITY"), col("INJ_LEVEL_DESC").alias("INJ_LEVEL"),
                                       "ACCIDENT_MONTH", col("ACCIDENT_HOUR_GROUP").alias("ACCIDENT_HOUR"), col("DAY_OF_WEEK_DESC").alias("DAY_OF_WEEK"), col("ACCIDENT_TYPE_DESC").alias("ACCIDENT_TYPE"), col("DCA_DESC").alias("DCA"),col("LIGHT_CONDITION_DESC").alias("LIGHT_COND"),col("ROAD_GEOMETRY_DESC").alias("ROAD_GEOMETRY"),"POLICE_ATTEND",col("SPEED_ZONE_DESC").alias("SPEED_ZONE"),
                                       "SURFACE_DRY","SURFACE_WET","SURFACE_MUDDY", "SURFACE_SNOWY","SURFACE_ICY","SURFACE_UNKNOWN",
                                       "ATMOSPH_CLEAR","ATMOSPH_RAINING","ATMOSPH_SNOWING","ATMOSPH_FOG","ATMOSPH_SMOKE","ATMOSPH_DUST","ATMOSPH_STRONG_WINDS","ATMOSPH_UNKNOWN",
                                       col("NODE_TYPE_DESC").alias("NODE_TYPE"),"LGA_NAME","REGION_NAME","DEG_URBAN_NAME", 
                                       "SEX","AGE_GROUP","LICENCE_STATE",col("ROAD_USER_TYPE_DESC").alias("ROAD_USER_TYPE"),
                                       "TAKEN_HOSPITAL","EJECTED_CODE",col("SEATING_POSITION_DESC").alias("SEATING_POSITION"),col("HELMET_BELT_WORN_DESC").alias("HELMET_BELT_WORN"),col("PEDEST_MOVEMENT_DESC").alias("PEDEST_MOVEMENT"),
                                       "VEHICLE_AGE_GROUP", "REG_STATE_GRP","VEHICLE_TYPE_DESC","FUEL_TYPE_DESC",
                                       "VEHICLE_MAKE",col("TRAILER_TYPE_DESC").alias("TRAILER_TYPE"), "VEHICLE_COLOUR_1","VEHICLE_COLOUR_2"  ,                                                        
                                       col("VEHICLE_DCA_DESC").alias("VEHICLE_DCA"),col("VEHICLE_MOVEMENT_DESC").alias("VEHICLE_MOVEMENT"), "VEHICLE_CHANGED_DIRECTION",col("DRIVER_INTENT_DESC").alias("DRIVER_INTENT"),col("ROAD_SURFACE_TYPE_DESC").alias("ROAD_SURFACE_TYPE"),
                                       "TOTAL_NO_OCCUPANTS",col("INITIAL_IMPACT_DESC").alias("INITIAL_IMPACT"),col("LEVEL_OF_DAMAGE_DESC").alias("LEVEL_OF_DAMAGE"),"CAUGHT_FIRE", "LAMPS_ON",col("TRAFFIC_CONTROL_DESC").alias("TRAFFIC_CONTROL"))\
                                 .cache()


#Clear unrequired Cache of Join stage dfs
df_crash_stage1.unpersist()
df_crash_stage2.unpersist()


In [None]:
# Confirm Schema / Data Types are as expected after joins
df_crash_final.printSchema()

In [None]:
# Summary of Statistics
df_crash_final.describe().toPandas()

In [None]:
# Distinct Values for each column
df_crash_final.select([F.countDistinct(c).alias(c) for c in df_crash_final.columns]).show(vertical=True,truncate=False)

In [None]:
# checking missing/null values : df_crash_final
tpdf = df_crash_final.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_crash_final.columns]).toPandas().transpose()
tpdf[(tpdf > 0).any(axis=1)]

In [None]:
# check duplicate
df_crash_final.groupBy("ACCIDENT_NO","PERSON_ID").count().filter("count > 1").show()

In [None]:
### Column distribution analysis
import matplotlib.pyplot as plt
plt.close("all")

# Plot Distribution for COlumns with small enought number of distinct values for high level analysis of data.
catg = [["LEVEL_OF_DAMAGE","barh"],["ACCIDENT_MONTH","bar"], ["VEHICLE_MAKE","bar"],["INITIAL_IMPACT","barh"],
        ["TRAFFIC_CONTROL","bar"],["AGE_GROUP","bar"],["HELMET_BELT_WORN","bar"],["VEHICLE_COLOUR_1","bar"],
        ["SPEED_ZONE","bar"]]

fig, axs = plt.subplots(3, 3,figsize=(15,10))

for r in [0,1,2] :
    for c in [0,1,2] :
        if len(catg) == 0:
            break
        else :
            cur = catg[0][0]
            bar= catg[0][1]
            sdf = df_crash_final.groupBy(cur).count()
            sdf.toPandas().set_index(cur).plot(kind=bar,ax=axs[r,c],legend=False)
            catg.pop(0)

fig.subplots_adjust(wspace=0.75,hspace=1)  

In [None]:
# check unqiue values for each column
[df_crash_final.groupBy(c).count().sort(col("count").desc()).limit(20).show(truncate=False) for c in df_crash_final.columns]

In [None]:
# check result
df_crash_final.limit(1).toPandas()

In [None]:
# check number of records & number of columns after final transformation
df_crash_final_count = df_crash_final.count()
df_crash_final_len = len(df_crash_final.columns)

# store the transformation result
df_shape_transform = spark.createDataFrame(
    [
        ("df_crash_final", "Number of Records", df_crash_final_count ), 
        ("df_crash_final", "Number of Columns", df_crash_final_len), 
    ],
    ["DATAFRAME", "SHAPE","TRANSFORM RESULTS"] 
)
df_shape_transform.toPandas()  #.show()

In [None]:
# summary of severity and injury
df_crash_final.groupBy('SEVERITY','INJ_LEVEL').count().sort('SEVERITY','INJ_LEVEL').toPandas()

[Back to Table of Contents](#toc)

<a class="anchor" id="transformation_summary"></a>
<div style="background:rgba(0,80,80,0.2);padding:10px;border-radius:4px"><h3> Summary</h3>
<hr/>
This section provide a summary of data cleaning and transformation's result. The activity includes:  <br>
 <br> - compare the total number of records and columns from different stages: Data Loading, Data Cleaning and Transformation   
</div>

In [None]:
# join the shape results from 3-activities: Data Loading, Data Cleaning and Transformation

# declare alias
i = df_shape_initial.alias('i')
c = df_shape_clean.alias('c')
t = df_shape_transform.alias('t')

# specify join condition
cond3 = [i.DATAFRAME == c.DATAFRAME, i.SHAPE == c.SHAPE]

In [None]:
# join these three shape's result
# using outer join to avoid removing data in the initial dataset
# clean the duplicated columns into a single column : DATAFRAME, SHAPE
# pivot the result for easy comparison

c.union(t).join(i, cond3, how ='outer')\
    .withColumn('M_DATAFRAME', 
                when(col('c.DATAFRAME').isNull(), i.DATAFRAME)\
                .when(col('i.DATAFRAME').isNull(), c.DATAFRAME)\
                .otherwise(col('i.DATAFRAME')))\
    .withColumn('M_SHAPE', 
                when(col('c.SHAPE').isNull(), i.SHAPE)\
                .when(col('i.SHAPE').isNull(), c.SHAPE)\
                .otherwise(col('i.SHAPE')))\
    .drop(c["DATAFRAME"]).drop(i["DATAFRAME"]).drop(c["SHAPE"]).drop(i["SHAPE"])\
    .withColumnRenamed("M_DATAFRAME", "DATAFRAME")\
    .withColumnRenamed("M_SHAPE", "SHAPE")\
    .select("DATAFRAME","SHAPE","INITIAL RESULTS","TRANSFORM RESULTS")\
    .groupBy('DATAFRAME').pivot("SHAPE").sum()\
    .withColumnRenamed("Number of Columns_sum(INITIAL RESULTS)", "Initial_Columns")\
    .withColumnRenamed("Number of Columns_sum(TRANSFORM RESULTS)", "Transform_Columns")\
    .withColumnRenamed("Number of Records_sum(INITIAL RESULTS)", "Initial_Records")\
    .withColumnRenamed("Number of Records_sum(TRANSFORM RESULTS)", "Transform_Records")\
    .sort('Transform_Records', ascending=False).show()

There are 11-files provided by VicRoads, in which only **6-datasets/files** are relevant for the scope of modelling. After cleaning and transformation, only **61-attributes** will be used for modelling. The cleaned dataset consists of people and vechile that involved in accidents. Not all accidents involve vehicles (e.g. pedestrian), therefore there are `null` values for vehicle records. Moreover, these data collection inputs are rely on police force manual inputs, certain infomation might not collected. So, those data are either leave it blank or mark as unknown.

**Performance fine-tuning**  
As part of big data processing, the performance of the execution been constantly monitored. We perform data skewness check in the partition, perform performance testing between joins (*sort-merge* and *broadcast-join*), and perform `cache()` or `persist()` whenever is appropriate to ensure the best efficient implementation. 

**toPandas()**  
One observation that was identified was that using ToPandas() is less efficient for implementation, particularly without restricting the output first. For the purposes of assignment's submission (to achieve readable clean layout), we have adjusted few outputs to Pandas frame. We understand `.show()`, .`take()` or `.collect()` are much more efficient alternatives for implementation.

<a class="anchor" id="export_final"></a>
<div style="background:rgba(0,80,80,0.2);padding:10px;border-radius:4px"><h2>Export cleaned/reshaped dataset</h2>
<hr/>
This section focuses on exporting the cleaned joined dataframe to csv/parquet and close the spark session. 
</div>

In [None]:
# write to files

# Parquet format - Not used for phase 2 submission, but leave here for later
df_crash_final.write.mode("overwrite").parquet("clean_data/df_crash_final.parquet")

#csv
#df_crash_final.write.mode("overwrite").csv("clean_data/df_crash_final.csv")

In [None]:
# stop spark session
#spark.stop()

[Back to Table of Contents](#toc)