# 1.1 - Data Preparation & Loading

In [14]:
# Import the necessary libraries

from pyspark.sql.functions import col
from pyspark.sql.functions import concat
from pyspark.sql.functions import lit
from pyspark.sql import Row
import pandas as pd
import ast

## 1.1.1

In [15]:
# Import SparkConf class into program
from pyspark import SparkConf
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL

# Setup configuration parameters for Spark
master = "local[*]"
app_name = "msha0060 Assignment 1"
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

## 1.1.2

In [35]:
# Import 5 CSV Files
Units1 = sc.textFile("2015_DATA_SA_Units.csv")
Units2 = sc.textFile("2016_DATA_SA_Units.csv")
Units3 = sc.textFile("2017_DATA_SA_Units.csv")
Units4 = sc.textFile("2018_DATA_SA_Units.csv")
Units5 = sc.textFile("2019_DATA_SA_Units.csv")

# Sequentially merge Unit2, Unit3, Unit4, and then Unit5 into Unit1
UnitsA = Units1.union(Units2)
UnitsB = UnitsA.union(Units3)
UnitsC = UnitsB.union(Units4)
Units = UnitsC.union(Units5)

## 1.1.3

In [28]:
# Import 5 CSV Files
Crash1 = sc.textFile("2015_DATA_SA_Crash.csv")
Crash2 = sc.textFile("2016_DATA_SA_Crash.csv")
Crash3 = sc.textFile("2017_DATA_SA_Crash.csv")
Crash4 = sc.textFile("2018_DATA_SA_Crash.csv")
Crash5 = sc.textFile("2019_DATA_SA_Crash.csv")

# Sequentially merge Crash1, Crash3, Crash4, and then Crash5 into Crash1
CrashA = Crash1.union(Crash2)
CrashB = CrashA.union(Crash3)
CrashC = CrashB.union(Crash4)
Crash = CrashC.union(Crash5)

## 1.1.4

In [36]:
# Identify the header row
header = Units.first()

# Filter Units to include every row that is not the header row
Units = Units.filter(lambda x: x != header)

# Split every line on a comma, as these are CSV files
Units = Units.map(lambda x: x.split(','))

In [30]:
# Identify the header row
header = Crash.first()

# Filter Units to include every row that is not the header row
Crash = Crash.filter(lambda x: x != header)

# Split every line on a comma, as these are CSV files
Crash = Crash.map(lambda x: (x.split(',')))

In [8]:
# Count the number of rows in the Units RDD
print ('There are', Units.count(),'rows in the Units RDD excluding the header:\n')
Units.take(10)

There are 153859 rows in the Units RDD excluding the header:



[['"REPORT_ID"',
  '"Unit No"',
  '"No Of Cas"',
  '"Veh Reg State"',
  '"Unit Type"',
  '"Veh Year"',
  '"Direction Of Travel"',
  '"Sex"',
  '"Age"',
  '"Lic State"',
  '"Licence Class"',
  '"Licence Type"',
  '"Towing"',
  '"Unit Movement"',
  '"Number Occupants"',
  '"Postcode"',
  '"Rollover"',
  '"Fire"'],
 ['"2015-1-21/08/2019"',
  '"01"',
  '0',
  '"SA"',
  '"RIGID TRUCK LGE GE 4.5T"',
  '"1999"',
  '"North East"',
  '"Male"',
  '"052"',
  '"SA"',
  '"HRR "',
  '"Full"',
  '"Not Towing"',
  '"Swerving"',
  '"001"',
  '"5109"',
  '',
  ''],
 ['"2015-1-21/08/2019"',
  '"02"',
  '1',
  '"SA"',
  '"Motor Cars - Sedan"',
  '"2009"',
  '"North East"',
  '"Female"',
  '"057"',
  '"SA"',
  '"C "',
  '"Full"',
  '"Not Towing"',
  '"Straight Ahead"',
  '"002"',
  '"5125"',
  '',
  ''],
 ['"2015-2-21/08/2019"',
  '"01"',
  '0',
  '"SA"',
  '"Motor Cars - Sedan"',
  '"2009"',
  '"South East"',
  '"Male"',
  '"020"',
  '"SA"',
  '"MR"',
  '"Provisional 1 "',
  '"Not Towing"',
  '"Straight A

In [9]:
# Count the number of rows in the Crash RDD
print ('There are', Crash.count(),'rows in the Crash RDD excluding the header:\n')
Crash.take(10)

There are 72006 rows in the Crash RDD excluding the header:



[['"2015-1-21/08/2019"',
  '"2 Metropolitan"',
  '"ELIZABETH VALE"',
  '"5112"',
  '"CITY OF PLAYFORD."',
  '2',
  '1',
  '0',
  '0',
  '1',
  '2015',
  '"January"',
  '"Wednesday"',
  '"01:00 pm"',
  '"060"',
  '"T-Junction"',
  '"Straight road"',
  '"Level"',
  '"Not Applicable"',
  '"Sealed"',
  '"Dry"',
  '"Not Raining"',
  '"Daylight"',
  '"Side Swipe"',
  '"01"',
  '"Driver Rider"',
  '"2: MI"',
  '"No Control"',
  '""',
  '""',
  '1335254.54',
  '1690056.88',
  '"13352551690057"'],
 ['"2015-2-21/08/2019"',
  '"2 Metropolitan"',
  '"SALISBURY"',
  '"5108"',
  '"CITY OF SALISBURY"',
  '2',
  '1',
  '0',
  '0',
  '1',
  '2015',
  '"February"',
  '"Tuesday"',
  '"03:38 pm"',
  '"060"',
  '"Cross Road"',
  '"Straight road"',
  '"Level"',
  '"Not Applicable"',
  '"Sealed"',
  '"Dry"',
  '"Not Raining"',
  '"Daylight"',
  '"Rear End"',
  '"01"',
  '"Driver Rider"',
  '"2: MI"',
  '"Traffic Signals"',
  '""',
  '""',
  '1333389.6',
  '1688248.34',
  '"13333901688248"'],
 ['"2015-3-21/08

# 1.2 - Data Partitioning in RDD
## 1.2.1

In [10]:
# Find the number of partitions in each RDD
print ('There are', Units.getNumPartitions(),'partitions in the Units RDD\n')
print ('There are', Crash.getNumPartitions(),'partitions in the Crash RDD\n')

There are 10 partitions in the Units RDD

There are 10 partitions in the Crash RDD



#### By default, Spark performs Random-Equal Partitioning, wherein each processor receives an equal share (or sub-table) of the original table.

## 1.2.2 - A

In [24]:
# Transofrm Units into a Key-Value RDD (Units1), with REPORT_ID as the Key and all other columns as the Value

Units1 = Units\
.map(lambda x: (x[9], x[0:9]+x[10:]))

In [25]:
# Verifying that Units is now a Key-Value RDD

Units1.take(1)

[('"SA"',
  ['"2015-1-21/08/2019"',
   '"01"',
   '0',
   '"SA"',
   '"RIGID TRUCK LGE GE 4.5T"',
   '"1999"',
   '"North East"',
   '"Male"',
   '"052"',
   '"HRR "',
   '"Full"',
   '"Not Towing"',
   '"Swerving"',
   '"001"',
   '"5109"',
   '',
   ''])]

## 1.2.2 - B

In [26]:
# Define a range function based on the value of the key

def range_function(key):
    if key == '"SA"':
        return 0
    else:
        return 1

In [27]:
# Apply the range function to Units1

Units2 = Units1.partitionBy(2, range_function)
UnitsPartition=Units2.glom().collect()

In [28]:
# Verify the partitioning by printing out the first 5 keys in each partition

print('The key for the first row in Parition 0 (corresponding to SA) is: ',\
      [x[0].strip('""') for x in UnitsPartition[0]][:5])

# Filter out null and unknown keys for the sake of this verification
print('The key for the fifth row in Parition 1 (NOT corresponding to SA) is: ',\
      [x[0].strip('""') for x in UnitsPartition[1] if (x[0] != '' and x[0] != '"UNKNOWN"')][:5])

The key for the first row in Parition 0 (corresponding to SA) is:  ['SA', 'SA', 'SA', 'SA', 'SA']
The key for the fifth row in Parition 1 (NOT corresponding to SA) is:  ['QLD', 'O/S', 'VIC', 'QLD', 'NT']


## 1.2.2 - C

In [29]:
print ("The first partition, containing records specific to SA contains",len(UnitsPartition[0]),'records')
print ("The first partition, containing records specific to other areas contains",len(UnitsPartition[1]),'records')
print ("The 2 partitions together contain",len(UnitsPartition[1])+len(UnitsPartition[0]),'records')

The first partition, containing records specific to SA contains 109684 records
The first partition, containing records specific to other areas contains 44170 records
The 2 partitions together contain 153854 records


#### We can see that the Units corresponding to SA comprise ~71% of the total data, and that all other regions only account for ~29% of the total data. This represents a large skew towards records dealing with 'SA', as the 'Other' category includes several regions.

# 1.3 - Query Analysis
## 1.3.1

In [20]:
# Define entities that are not drivers
nonDrivers = ['"Guard Rail"','"Other Fixed Obstruction"','"Other Inanimate Object"',\
              '"Pedestrian on Road"','"Stobie Pole"','"Traffic Signal Pole"',\
              '"Tree"','"Utility"']

# Define lists to help filter out invalid values in the following cells
nonAge = ['""'," ","",''""'','"XXX"','"XXXX"']

In [21]:
# Filter out Male drivers, and calculate the mean of their ages
# Here, we use the above defined lists to filter out wrong or inappropriate values for this query
# We also filter for ages < 150, because of the presence of an incorrectly entered record listing an age > 500
UnitsMale = Units\
.filter(lambda x: x[7]=='"Male"')\
.filter(lambda x: x[4] not in nonDrivers)\
.filter(lambda x: x[8] not in nonAge)\
.map(lambda x: x[8].strip('""'))\
.map(lambda x: float(x))\
.filter(lambda x: int(x)<150).mean()

In [22]:
# Filter out female drivers and calculate the mean of their ages
UnitsFemale = Units\
.filter(lambda x: x[7]=='"Female"')\
.filter(lambda x: x[4] not in nonDrivers)\
.filter(lambda x: x[8] not in nonAge)\
.map(lambda x: x[8].strip('""'))\
.map(lambda x: float(x))\
.filter(lambda x: int(x)<150).mean()

In [23]:
print('The average age of male drivers is', round(UnitsMale,2))
print('The average age of female drivers is', round(UnitsFemale,2))

The average age of male drivers is 41.14
The average age of female drivers is 40.41


## 1.3.2

In [37]:
nonAge = ['""'," ","",''""'','"XXX"','"XXXX"']

# Filter incorrectly entered 'Veh Year' values
UnitsVehYear=Units\
.filter(lambda x: x[5] not in nonAge)\
.map(lambda x: x[3:6])

In [38]:
# Find the maximum and minimum values in the 'Veh Year' columns
UnitsMax = [x.strip('""') for x in UnitsVehYear.max(key = lambda x: x[-1])]
UnitsMin = [x.strip('""') for x in UnitsVehYear.min(key = lambda x: x[-1])]

In [39]:
print('Information for the newest vehicle involved in an accident: ',UnitsMax)
print('Information for the oldest vehicle involved in an accident: ',UnitsMin)

Information for the newest vehicle involved in an accident:  ['SA', 'Station Wagon', '2019']
Information for the oldest vehicle involved in an accident:  ['VIC', 'Motor Cycle', '1900']


# 2 - Working with Dataframes
## 2.1 - Data Preparation & Loading
## 2.1.1

In [24]:
# Import 5 CSV Files
Units1 = spark.read.csv("2015_DATA_SA_Units.csv")
Units2 = spark.read.csv("2016_DATA_SA_Units.csv")
Units3 = spark.read.csv("2017_DATA_SA_Units.csv")
Units4 = spark.read.csv("2018_DATA_SA_Units.csv")
Units5 = spark.read.csv("2019_DATA_SA_Units.csv")

# Sequentially merge Unit2, Unit3, Unit4, and then Unit5 into Unit1
UnitsA = Units1.union(Units2)
UnitsB = UnitsA.union(Units3)
UnitsC = UnitsB.union(Units4)
Units = UnitsC.union(Units5)

# Because of the merges, the header would have been repeated 4 times so we drop duplicates
header = Units.first()
Units = Units.filter(col("_c0")!="REPORT_ID")

# Rename the columns based on the header row
for i in range(0,len(Units.columns)):
    Units = Units.withColumnRenamed(Units.columns[i],header[i])

In [25]:
# Import 5 CSV Files
Crash1 = spark.read.csv("2015_DATA_SA_Crash.csv")
Crash2 = spark.read.csv("2016_DATA_SA_Crash.csv")
Crash3 = spark.read.csv("2017_DATA_SA_Crash.csv")
Crash4 = spark.read.csv("2018_DATA_SA_Crash.csv")
Crash5 = spark.read.csv("2019_DATA_SA_Crash.csv")

# Sequentially merge Crash1, Crash3, Crash4, and then Crash5 into Crash1
CrashA = Crash1.union(Crash2)
CrashB = CrashA.union(Crash3)
CrashC = CrashB.union(Crash4)
Crash = CrashC.union(Crash5)

# Because of the merges, the header would have been repeated 4 times so we drop duplicates
header = Crash.first()
Crash = Crash.filter(col("_c0")!="REPORT_ID")

# Rename the columns based on the header row
for i in range(0,len(Crash.columns)):
    Crash = Crash.withColumnRenamed(Crash.columns[i],header[i])

## 2.1.2

In [26]:
Units.printSchema()    
Crash.printSchema()

root
 |-- REPORT_ID: string (nullable = true)
 |-- Unit No: string (nullable = true)
 |-- No Of Cas: string (nullable = true)
 |-- Veh Reg State: string (nullable = true)
 |-- Unit Type: string (nullable = true)
 |-- Veh Year: string (nullable = true)
 |-- Direction Of Travel: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Lic State: string (nullable = true)
 |-- Licence Class: string (nullable = true)
 |-- Licence Type: string (nullable = true)
 |-- Towing: string (nullable = true)
 |-- Unit Movement: string (nullable = true)
 |-- Number Occupants: string (nullable = true)
 |-- Postcode: string (nullable = true)
 |-- Rollover: string (nullable = true)
 |-- Fire: string (nullable = true)

root
 |-- REPORT_ID: string (nullable = true)
 |-- Stats Area: string (nullable = true)
 |-- Suburb: string (nullable = true)
 |-- Postcode: string (nullable = true)
 |-- LGA Name: string (nullable = true)
 |-- Total Units: string (nullable = true)


# 2.2 - Query/Analysis
## 2.2.1

In [27]:
# Filter on Adelaide, then Filter on there being >3 casualties, and display relevant columns
Crash221 = Crash\
.filter(col("Suburb")=="ADELAIDE")\
.filter(col("Total Cas")>3)\
.select('REPORT_ID','Suburb','Total Cas','Year','Month','Day')\
.show()

+--------------------+--------+---------+----+--------+--------+
|           REPORT_ID|  Suburb|Total Cas|Year|   Month|     Day|
+--------------------+--------+---------+----+--------+--------+
|2017-1613-15/08/2019|ADELAIDE|        4|2017|February|Saturday|
|2017-12182-15/08/...|ADELAIDE|        5|2017|December|Saturday|
| 2018-601-17/01/2020|ADELAIDE|        4|2018| January|  Sunday|
|2019-10404-8/07/2020|ADELAIDE|        6|2019| October|  Monday|
+--------------------+--------+---------+----+--------+--------+



## 2.2.2

In [28]:
# Order casualties in descending order, and display relevant columns
Crash222 = Crash\
.orderBy((Crash['Total Cas'])\
.desc())\
.select('REPORT_ID','Total Cas','Suburb','Year','Month','Day')\
.show(10)

+--------------------+---------+---------------+----+--------+---------+
|           REPORT_ID|Total Cas|         Suburb|Year|   Month|      Day|
+--------------------+---------+---------------+----+--------+---------+
|2016-6630-15/08/2019|        9|  KANGAROO FLAT|2016|   April|Wednesday|
|2016-3035-15/08/2019|        9|        HACKHAM|2016| January| Saturday|
|2019-11734-8/07/2020|        9|          STURT|2019|November|   Sunday|
|2016-7073-15/08/2019|        8|       MERRITON|2016|   April|   Sunday|
|2015-2823-21/08/2019|        8|         HAWKER|2015|   March|   Monday|
|2016-14407-15/08/...|        8|      STOCKWELL|2016| October|   Sunday|
|2017-7615-15/08/2019|        7| SOUTH PLYMPTON|2017|    July| Saturday|
|2015-13713-21/08/...|        7|ELIZABETH GROVE|2015|November|   Friday|
|2015-12591-21/08/...|        7|        MALLALA|2015| October|   Sunday|
|2016-8547-15/08/2019|        7|        WINDSOR|2016|     May| Saturday|
+--------------------+---------+---------------+---

## 2.2.3

In [29]:
# Group Crash by Crash Type, aggregating on Total Casualties
Crash223 = Crash\
.groupBy('Crash Type')\
.agg({'Total Fats': 'sum'})\
.withColumnRenamed('sum(Total Fats)','Fatalities')

# Order the table in descending order of fatalities
Crash223\
.orderBy(Crash223['Fatalities']\
.desc())\
.show()

+--------------------+----------+
|          Crash Type|Fatalities|
+--------------------+----------+
|    Hit Fixed Object|     152.0|
|             Head On|      86.0|
|      Hit Pedestrian|      70.0|
|           Roll Over|      57.0|
|         Right Angle|      45.0|
|          Side Swipe|      20.0|
|          Right Turn|      18.0|
|            Rear End|      16.0|
|  Hit Parked Vehicle|       9.0|
|          Hit Animal|       4.0|
|  Hit Object on Road|       2.0|
|               Other|       2.0|
|Left Road - Out o...|       1.0|
+--------------------+----------+



## 2.2.4

In [30]:
# Inner Join the filtered DataFrames on REPORT_ID, grouping by Suburb, and aggregating on Total Casualties
Crash224 = Crash\
.join(Units,'REPORT_ID',how = 'inner')\
.groupBy('Suburb')\
.agg({'Total Cas': 'sum'})\
.withColumnRenamed('sum(Total Cas)','Total Casualties')

# Order the table in descending order of casualties
Crash224\
.orderBy(Crash224['Total Casualties']\
.desc())\
.show()

+-----------------+----------------+
|           Suburb|Total Casualties|
+-----------------+----------------+
|         ADELAIDE|          2343.0|
|         PROSPECT|           791.0|
|   NORTH ADELAIDE|           708.0|
|     MAWSON LAKES|           669.0|
|            STURT|           598.0|
|   SALISBURY EAST|           557.0|
|          NORWOOD|           553.0|
|    MORPHETT VALE|           551.0|
|     MOUNT BARKER|           542.0|
|          POORAKA|           481.0|
|    MOUNT GAMBIER|           472.0|
|        SALISBURY|           470.0|
|      GEPPS CROSS|           452.0|
|       INGLE FARM|           425.0|
|         PARADISE|           409.0|
|        WINGFIELD|           405.0|
|          MODBURY|           403.0|
|PARAFIELD GARDENS|           391.0|
|        PARALOWIE|           389.0|
|          KLEMZIG|           368.0|
+-----------------+----------------+
only showing top 20 rows



# 2.3 - Severity Analysis
## 2.3.1

In [31]:
# Group by by CSEF Severity, and aggregate over count
Crash231 = Crash\
.groupBy('CSEF Severity')\
.agg({'CSEF Severity': 'count'})\
.withColumnRenamed('count(CSEF Severity)','Count')

print ('Fatalities across all Severity Types:')

# Arrange in descending order of count
Crash231\
.orderBy(Crash231['Count']\
.desc())\
.show()

Fatalities across all Severity Types:
+-------------+-----+
|CSEF Severity|Count|
+-------------+-----+
|       1: PDO|46696|
|        2: MI|21881|
|        3: SI| 2978|
|     4: Fatal|  451|
+-------------+-----+



#### PDO (Property Damage Only) seems to be the most common Crash Type



## 2.3.2

In [32]:
# Define 4 DataFrames based on the different combinations in Drugs Involved and DUI Involved

# DUI ONLY
CrashDUI = Crash\
.filter(col('DUI Involved')=='Y')\
.filter(col('Drugs Involved').isNull())

# DRUGS ONLY
CrashDrugs = Crash\
.filter(col('Drugs Involved')=='Y')\
.filter(col('DUI Involved').isNull())

# BOTH DUI AND DRUGS
CrashDUIDrugs = Crash\
.filter(col('DUI Involved')=='Y')\
.filter(col('Drugs Involved')=='Y')

# NEITHER DUI NOR DRUGS
CrashNoDUIDrugs = Crash\
.filter(col('Drugs Involved').isNull())\
.filter(col('DUI Involved').isNull())

## 2.3.2 [A - D]

In [33]:
# Text to describe each of 4 outputted tables
Labels = ['Crash Events for Drugs but NO Alcohol:'\
         ,'Crash Events for Alcohol but NO Drugs:'\
         ,'Crash Events for Alcohol AND Drugs:'\
         ,'Crash Events for NO Alcohol and NO Drugs:']

# Names of the 4 DataFrames generated in the previous cell
Dataframes = [CrashDUI,CrashDrugs,CrashDUIDrugs,CrashNoDUIDrugs]

# Iterate through the 4 DataFrames, performing the same grouping and aggregation functions on each
for dataframe in Dataframes:
    
    Crash232 = dataframe\
    .groupBy('CSEF Severity')\
    .agg({'CSEF Severity': 'count'})

    # Rename columns for readability
    Crash232 = Crash232\
    .withColumnRenamed('count(CSEF Severity)','Count')

    # In each DataFrame, collect the sum total of fatalities to be able to get each Severity's percentage
    Crash232Total = sum(Crash232\
                        .select(['Count'])\
                        .rdd.flatMap(lambda x: x)\
                        .collect())
    
    print (Labels[Dataframes\
                  .index(dataframe)])
    
    # Calculate each condition's percentage based on the total derived above, ordering by raw count of Crash events
    Crash232 = Crash232\
    .withColumn('Percentage',100*Crash232['Count']/Crash232Total)\
    .orderBy(Crash232['Count']\
    .desc())\
    .show()

Crash Events for Drugs but NO Alcohol:
+-------------+-----+------------------+
|CSEF Severity|Count|        Percentage|
+-------------+-----+------------------+
|       1: PDO| 1149| 55.42691751085383|
|        2: MI|  648| 31.25904486251809|
|        3: SI|  224|10.805595754944525|
|     4: Fatal|   52|2.5084418716835506|
+-------------+-----+------------------+

Crash Events for Alcohol but NO Drugs:
+-------------+-----+------------------+
|CSEF Severity|Count|        Percentage|
+-------------+-----+------------------+
|        2: MI|  660| 61.16774791473587|
|        3: SI|  212|19.647822057460612|
|       1: PDO|  152|14.087117701575533|
|     4: Fatal|   55| 5.097312326227989|
+-------------+-----+------------------+

Crash Events for Alcohol AND Drugs:
+-------------+-----+------------------+
|CSEF Severity|Count|        Percentage|
+-------------+-----+------------------+
|        2: MI|   89|50.857142857142854|
|        3: SI|   35|              20.0|
|     4: Fatal|   27|15

# 2.4 - RDDs vs DataFrame vs Spark SQL

## [RDD]

In [53]:
# Import 5 CSV Files
Units1 = sc.textFile("2015_DATA_SA_Units.csv")
Units2 = sc.textFile("2016_DATA_SA_Units.csv")
Units3 = sc.textFile("2017_DATA_SA_Units.csv")
Units4 = sc.textFile("2018_DATA_SA_Units.csv")
Units5 = sc.textFile("2019_DATA_SA_Units.csv")

# Sequentially merge Unit2, Unit3, Unit4, and then Unit5 into Unit1
UnitsA = Units1.union(Units2)
UnitsB = UnitsA.union(Units3)
UnitsC = UnitsB.union(Units4)
Units = UnitsC.union(Units5)

# Identify the header row
header = Units.first()

# Filter Units to include every row that is not the header row
Units = Units.filter(lambda x: x != header).map(lambda x: (x.split(',')))

In [54]:
# Import 5 CSV Files
Crash1 = sc.textFile("2015_DATA_SA_Crash.csv")
Crash2 = sc.textFile("2016_DATA_SA_Crash.csv")
Crash3 = sc.textFile("2017_DATA_SA_Crash.csv")
Crash4 = sc.textFile("2018_DATA_SA_Crash.csv")
Crash5 = sc.textFile("2019_DATA_SA_Crash.csv")

# Sequentially merge Crash1, Crash3, Crash4, and then Crash5 into Crash1
CrashA = Crash1.union(Crash2)
CrashB = CrashA.union(Crash3)
CrashC = CrashB.union(Crash4)
Crash = CrashC.union(Crash5)

# Identify the header row
header = Crash.first()

Crash = Crash.filter(lambda x: x != header).map(lambda x: (x.split(',')))

In [55]:
Suburbs = sorted(set(list(Crash.map(lambda x: x[2]).collect())))

In [56]:
# Shorten the 2 RDDs to only include relevant fields to increase responsiveness
Units241 = Units.map(lambda x: (x[0], (x[7],x[8],x[11])))
Crash241 = Crash.map(lambda x: (x[0], (x[2],x[4],x[6],x[10],x[11],x[12],x[13],x[14])))

## [RDD] 2.4.1

In the Crash CSV, some unfortunate formatting such as the inclusion of a comma WITHIN a particular `LGA Name` value ("CC OF NORWOOD,PAYNEHAM & ST PETERS") meant that RDD Operations were not able to easily process an identical schema for every `LGA Name`. To combat this, I processed the offending rows separately and simply joined the results at the end. As a result, both sets of operations differ slightly in index values used, but are otherwise entirely identical in terms of input and output.

In [57]:
%%time

# List of invalid values that are filtered out
# Values marked 'UNKNOWN' etc are left in, because those are not wrong, simply unknown for other reasons, and are part of a valid record
invalidValues = ['""'," ","",''""'','"XXX"']

# Join the 2 RDDs
CrashUnitsRDD = Crash241\
.join(Units241)

# Filter for Adelaide, and continually append the request values to one another using Python strings
CrashUnitsList1 = CrashUnitsRDD\
.filter(lambda x: x[1][0][0]=='"ADELAIDE"')\
.filter(lambda x: x[1][1][1] not in invalidValues)\
.filter(lambda x: x[1][1][2] != '')\
.map(lambda x:\
     str(x[1][0][3]).strip('""')\
     +'-'+str(x[1][0][4]).strip('""')\
     +'-'+str(x[1][0][5]).strip('""')\
     +', '+str(x[1][0][6]).strip('""')\
     +', '+str(x[1][0][2]).strip('""')\
     +', '+str(x[1][1][0]).strip('""')\
     +', '+str(x[1][1][1]).strip('""')\
     +', '+str(x[1][1][2]).strip('""'))\
.collect()

# Repeat the same process as above, but including an additional filter for "CC OF NORWOOD,PAYNEHAM & ST PETERS".
# Some of the indexing values also differ as a result of the incorrect formatting
CrashUnitsList2 =CrashUnitsRDD\
.filter(lambda x: x[1][0][0]=='"ADELAIDE"')\
.filter(lambda x: x[1][0][1]=='"CC OF NORWOOD')\
.filter(lambda x: x[1][1][1] not in invalidValues)\
.filter(lambda x: x[1][1][2] != '')\
.map(lambda x:\
     str(x[1][0][4]).strip('""')\
     +'-'+str(x[1][0][5]).strip('""')\
     +'-'+str(x[1][0][6]).strip('""')\
     +', '+str(x[1][0][7]).strip('""')\
     +', '+str(x[1][0][2]).strip('""')\
     +', '+str(x[1][1][0]).strip('""')\
     +', '+str(x[1][1][1]).strip('""')\
     +', '+str(x[1][1][2]).strip('""'))\
.collect()

CPU times: user 39.1 ms, sys: 5.53 ms, total: 44.6 ms
Wall time: 2.46 s


In [58]:
# For better readability, present the above RDDs as a Pandas Dataframe

# First, concatenate the 2 RDDs
data = CrashUnitsList1 + CrashUnitsList2

# Split the list values, and create a Pandas DataFrame
df = pd.DataFrame([x.split(',') for x in data])

# Rename the columns
df = df.rename(columns={0: "Date", 1: "Time", 2: "Casualties", 3:"Gender", 4:"Age", 5:"Licence Type"})
df

Unnamed: 0,Date,Time,Casualties,Gender,Age,Licence Type
0,2015-January-Friday,06:10 pm,1,Female,035,Full
1,2015-January-Friday,06:10 pm,1,Female,037,Full
2,2015-January-Friday,05:00 pm,0,Male,041,Full
3,2015-January-Tuesday,03:10 pm,0,Female,044,Full
4,2015-January-Tuesday,03:10 pm,0,Male,061,Full
...,...,...,...,...,...,...
4498,2016-January-Thursday,04:30 pm,2,Male,017,Provisional 1
4499,2016-January-Thursday,04:30 pm,2,Female,029,Full
4500,2019-October-Friday,11:00 am,2,Male,063,Full
4501,2016-October-Sunday,09:35 pm,2,Male,025,Unknown


## [RDD] 2.4.2

In [96]:
Units242 = Units.map(lambda x: (x[0], (x[7],x[8],x[11])))
Crash242 = Crash.map(lambda x: (x[0], (x[2],x[4],x[6],x[10],x[11],x[12],x[13],x[14])))

In [98]:
%%time

# Collect an alphabetically set of all unique Suburbs identified in the Crash RDD

# For each unique suburb:
for suburb in Suburbs:

    # Join the 2 RDDs, filter for the suburb in question, and filter for an 'Unlicenced' Licence Type
    CrashUnits = Crash242\
    .join(Units242)\
    .filter(lambda x: x[1][0][0] == suburb)\
    .filter(lambda x: x[1][1][-1] == '"Unlicenced"')
    
    # Preliminarily stop the loop if there are no 'Unlicenced' Licence Type crashes for that suburb
    if len(CrashUnits.collect())==0:
        print ("['SUBURB:",suburb,", CASUALTIES: 0']")
        
    # ReduceByKey on (suburb,casualties) for each such combination found per row for the suburb in question
    # This gives (suburb, total casualties for the suburb)
    else:
        CrashUnits = CrashUnits\
        .map(lambda x: (x[1][0][0], x[1][0][2]))\
        .reduceByKey(lambda x,y: (int(x)+(int(y))))\
        .map(lambda x: 'SUBURB: '+suburb+', CASUALTIES: '+str(x[1]))            

        print(CrashUnits.collect())

['SUBURB: "  PARAFIELD" , CASUALTIES: 0']
['SUBURB: "ABERFOYLE PARK", CASUALTIES: 1']
['SUBURB: "ABMINGA STATION" , CASUALTIES: 0']
['SUBURB: "ADELAIDE AIRPORT" , CASUALTIES: 0']
['SUBURB: "ADELAIDE", CASUALTIES: 19']
['SUBURB: "AGERY" , CASUALTIES: 0']
['SUBURB: "ALAWOONA" , CASUALTIES: 0']
['SUBURB: "ALBERT PARK", CASUALTIES: 2']
['SUBURB: "ALBERTON", CASUALTIES: 2']
['SUBURB: "ALDGATE" , CASUALTIES: 0']
['SUBURB: "ALDINGA BEACH", CASUALTIES: 2']
['SUBURB: "ALDINGA", CASUALTIES: 3']
['SUBURB: "ALFORD" , CASUALTIES: 0']
['SUBURB: "ALLANDALE STATION" , CASUALTIES: 0']
['SUBURB: "ALLENBY GARDENS", CASUALTIES: 2']
['SUBURB: "ALLENDALE EAST", CASUALTIES: 1']
['SUBURB: "ALLENDALE NORTH" , CASUALTIES: 0']
['SUBURB: "ALMA", CASUALTIES: 0']
['SUBURB: "ALPANA" , CASUALTIES: 0']
['SUBURB: "ALTONA" , CASUALTIES: 0']
['SUBURB: "AMATA", CASUALTIES: 1']
['SUBURB: "AMERICAN RIVER" , CASUALTIES: 0']
['SUBURB: "ANAMA" , CASUALTIES: 0']
['SUBURB: "ANANGU PITJANTJATJARA YANKUNYTJATARA", CASUALTIES: 2']


## [DataFrame]

In [42]:
# Import 5 CSV Files
Crash1 = spark.read.csv("2015_DATA_SA_Crash.csv")
Crash2 = spark.read.csv("2016_DATA_SA_Crash.csv")
Crash3 = spark.read.csv("2017_DATA_SA_Crash.csv")
Crash4 = spark.read.csv("2018_DATA_SA_Crash.csv")
Crash5 = spark.read.csv("2019_DATA_SA_Crash.csv")

# Sequentially merge Crash1, Crash3, Crash4, and then Crash5 into Crash1
CrashA = Crash1.union(Crash2)
CrashB = CrashA.union(Crash3)
CrashC = CrashB.union(Crash4)
Crash = CrashC.union(Crash5)

# Because of the merges, the header would have been repeated 4 times so we drop duplicates
CrashSchema = Crash.first()
Crash = Crash.filter(col("_c0")!="REPORT_ID")

# Rename columns
for i in range(0,len(Crash.columns)):
    Crash = Crash.withColumnRenamed(Crash.columns[i],CrashSchema[i])

In [43]:
# Import 5 CSV Files
Units1 = spark.read.csv("2015_DATA_SA_Units.csv")
Units2 = spark.read.csv("2016_DATA_SA_Units.csv")
Units3 = spark.read.csv("2017_DATA_SA_Units.csv")
Units4 = spark.read.csv("2018_DATA_SA_Units.csv")
Units5 = spark.read.csv("2019_DATA_SA_Units.csv")

# Sequentially merge Unit2, Unit3, Unit4, and then Unit5 into Unit1
UnitsA = Units1.union(Units2)
UnitsB = UnitsA.union(Units3)
UnitsC = UnitsB.union(Units4)
Units = UnitsC.union(Units5)

# Because of the merges, the header would have been repeated 4 times so we drop duplicates
UnitsSchema = Units.first()
Units = Units.filter(col("_c0")!="REPORT_ID")

# Rename columns
for i in range(0,len(Units.columns)):
    Units = Units.withColumnRenamed(Units.columns[i],UnitsSchema[i])

## [DataFrame] 2.4.1

In [44]:
%%time

# Join the DataFrames, Filter for Suburb, Filter out invalid Ages, Filter out NULL Licence Types
# Format the Date information correctly, Include all other relevant columns, rename, and show
Crash241 = Crash\
.join(Units,'REPORT_ID',how = 'inner')\
.filter(col('Suburb') == 'ADELAIDE')\
.filter(col('Age') != 'XXX')\
.filter(col('Licence Type').isNotNull())\
.withColumn('YearMonthDay',(concat(col("Year"), lit("-"), col("Month"), lit("-"), col("Day"))))\
.select(['YearMonthDay','Time','Total Cas','Sex','Age','Licence Type'])\
.withColumnRenamed('YearMonthDay','Date')\
.show()

+--------------------+--------+---------+------+---+--------------+
|                Date|    Time|Total Cas|   Sex|Age|  Licence Type|
+--------------------+--------+---------+------+---+--------------+
|2015-January-Tuesday|08:30 am|        0|  Male|030|          Full|
|2015-January-Tuesday|08:30 am|        0|  Male|042|          Full|
|2015-January-Thur...|06:30 pm|        0|  Male|025|          Full|
|2015-January-Thur...|06:30 pm|        0|  Male|055|          Full|
| 2015-January-Friday|05:00 pm|        0|  Male|028|          Full|
| 2015-January-Friday|06:10 pm|        1|Female|035|          Full|
| 2015-January-Friday|06:10 pm|        1|Female|037|          Full|
| 2015-January-Friday|05:00 pm|        0|  Male|041|          Full|
|2015-January-Wedn...|09:45 am|        0|Female|058|          Full|
|2015-January-Satu...|01:55 pm|        0|  Male|025|          Full|
|2015-February-Tue...|04:20 pm|        1|Female|040|          Full|
|2015-February-Tue...|04:20 pm|        1|Female|

## [DataFrame] 2.4.2

In [45]:
%%time

# Join the DataFrames, Filter for Unlicenced Licence Types
# Group by Suburb, Aggregating the sum of Casualties and renaming the aggregated column
Crash242 = Crash\
.join(Units,'REPORT_ID',how = 'inner')\
.filter(col('Licence Type')=='Unlicenced')\
.groupBy('Suburb')\
.agg({'Total Cas': 'sum'})\
.withColumnRenamed('sum(Total Cas)','Casualties')

# Sort the returned DataFrame by Casualties in descending order
Crash242\
.orderBy(Crash242['Casualties']\
.desc())\
.show()

+---------------+----------+
|         Suburb|Casualties|
+---------------+----------+
|       ADELAIDE|      19.0|
|      SALISBURY|      18.0|
|      DRY CREEK|      18.0|
| SALISBURY EAST|      16.0|
|       PROSPECT|      14.0|
| NORTH ADELAIDE|      13.0|
|        ENFIELD|      12.0|
|   ANDREWS FARM|      12.0|
|SALISBURY SOUTH|      11.0|
|SALISBURY DOWNS|      11.0|
|   BEDFORD PARK|      11.0|
|     INGLE FARM|      11.0|
|   MOUNT BARKER|      10.0|
|     MUNNO PARA|      10.0|
|  MORPHETT VALE|      10.0|
|SALISBURY PLAIN|      10.0|
| ELIZABETH PARK|      10.0|
|   MAWSON LAKES|      10.0|
|         BURTON|      10.0|
|    HOLDEN HILL|       9.0|
+---------------+----------+
only showing top 20 rows

CPU times: user 5.06 ms, sys: 1.91 ms, total: 6.97 ms
Wall time: 1.71 s


## [Spark SQL]

In [46]:
# 'Cast' the DataFrames as temporary views to use SQL on
Units.createOrReplaceTempView("Units")
Crash.createOrReplaceTempView("Crash")

## [Spark SQL] 2.4.1

In [47]:
%%time

# SELECT the columns to take (creating the Date column in the process)
# FROM Crash JOIN Units where the Suburb is 'Adelaide'
sql241 = spark.sql\
("SELECT Year || '-' || Month || '-' || Day as Date, Time, \
`Total Cas`, Sex, Age, `Licence Type` \
FROM Crash INNER JOIN Units ON Crash.REPORT_ID = Units.REPORT_ID \
WHERE Crash.Suburb='ADELAIDE'")\
.collect()

CPU times: user 20.6 ms, sys: 1.07 ms, total: 21.7 ms
Wall time: 1.05 s


In [48]:
# Represent the DataFrame as a Pandas DataFrame for better readability
df = pd.DataFrame(sql241)

# Rename the columns
df = df.rename(columns={0: "Date", 1: "Time", 2: "Casualties", 3:"Gender", 4:"Age", 5:"Licence Type"})
df

Unnamed: 0,Date,Time,Casualties,Gender,Age,Licence Type
0,2015-January-Tuesday,08:30 am,0,Male,030,Full
1,2015-January-Tuesday,08:30 am,0,Male,042,Full
2,2015-January-Thursday,06:30 pm,0,Male,025,Full
3,2015-January-Thursday,06:30 pm,0,Male,055,Full
4,2015-January-Friday,05:00 pm,0,Male,028,Full
...,...,...,...,...,...,...
6305,2019-December-Sunday,09:19 am,0,Male,057,Full
6306,2019-December-Tuesday,03:40 pm,0,Male,065,Full
6307,2019-December-Tuesday,03:40 pm,0,Female,056,Full
6308,2019-December-Tuesday,03:00 pm,0,Male,026,Full


## [Spark SQL] 2.4.2

In [49]:
%%time

# SELECT the columns, specifying to  to take FROM Crash JOIN Units
# WHERE Licence Type is Unlicenced
sql242 = spark.sql("SELECT  Suburb, SUM(`Total Cas`) AS `Casualties` \
FROM Crash INNER JOIN Units ON Crash.REPORT_ID = Units.REPORT_ID \
WHERE Units.`Licence Type`='Unlicenced' \
GROUP BY Suburb \
ORDER BY CASUALTIES DESC") \
.collect()

CPU times: user 4.55 ms, sys: 1.81 ms, total: 6.35 ms
Wall time: 2.17 s


In [50]:
# Represent the DataFrame as a Pandas DataFrame for better readability
df = pd.DataFrame(sql242)

# Rename the columns
df = df.rename(columns={0: "Suburb", 1: "Casualties"})
df

Unnamed: 0,Suburb,Casualties
0,ADELAIDE,19.0
1,DRY CREEK,18.0
2,SALISBURY,18.0
3,SALISBURY EAST,16.0
4,PROSPECT,14.0
...,...,...
629,REEVES PLAINS,0.0
630,FULHAM,0.0
631,STRUAN,0.0
632,BIRDWOOD,0.0
