In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

master = "local[*]"
app_name = "A1"
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

spark = SparkSession.builder.config(conf = spark_conf).getOrCreate()
sc = spark.sparkContext

In [3]:
spark

In [4]:
import csv
years = range(2015, 2020)
crashes_files = [f"{year}_DATA_SA_Crash.csv" for year in years]
crashes_list = []
for crashes_file in crashes_files:
    with open(crashes_file) as file:
        crashes_list += list(csv.reader(file))[1:]

In [5]:
crashes_list

[['2015-1-21/08/2019',
  '2 Metropolitan',
  'ELIZABETH VALE',
  '5112',
  'CITY OF PLAYFORD.',
  '2',
  '1',
  '0',
  '0',
  '1',
  '2015',
  'January',
  'Wednesday',
  '01:00 pm',
  '060',
  'T-Junction',
  'Straight road',
  'Level',
  'Not Applicable',
  'Sealed',
  'Dry',
  'Not Raining',
  'Daylight',
  'Side Swipe',
  '01',
  'Driver Rider',
  '2: MI',
  'No Control',
  '',
  '',
  '1335254.54',
  '1690056.88',
  '13352551690057'],
 ['2015-2-21/08/2019',
  '2 Metropolitan',
  'SALISBURY',
  '5108',
  'CITY OF SALISBURY',
  '2',
  '1',
  '0',
  '0',
  '1',
  '2015',
  'February',
  'Tuesday',
  '03:38 pm',
  '060',
  'Cross Road',
  'Straight road',
  'Level',
  'Not Applicable',
  'Sealed',
  'Dry',
  'Not Raining',
  'Daylight',
  'Rear End',
  '01',
  'Driver Rider',
  '2: MI',
  'Traffic Signals',
  '',
  '',
  '1333389.6',
  '1688248.34',
  '13333901688248'],
 ['2015-3-21/08/2019',
  '2 Metropolitan',
  'ST MARYS',
  '5042',
  'CC MITCHAM.                   ',
  '2',
  '1',

In [6]:
import csv
years = range(2015, 2020)
units_files = [f"{year}_DATA_SA_Units.csv" for year in years]
units_list = []
for units_file in units_files:
    with open(units_file) as file:
        units_list += list(csv.reader(file))[1:]

In [7]:
crashes_rdd = sc.parallelize(crashes_list)
units_rdd = sc.parallelize(units_list)

In [8]:
crashes_rdd.take(10)

[['2015-1-21/08/2019',
  '2 Metropolitan',
  'ELIZABETH VALE',
  '5112',
  'CITY OF PLAYFORD.',
  '2',
  '1',
  '0',
  '0',
  '1',
  '2015',
  'January',
  'Wednesday',
  '01:00 pm',
  '060',
  'T-Junction',
  'Straight road',
  'Level',
  'Not Applicable',
  'Sealed',
  'Dry',
  'Not Raining',
  'Daylight',
  'Side Swipe',
  '01',
  'Driver Rider',
  '2: MI',
  'No Control',
  '',
  '',
  '1335254.54',
  '1690056.88',
  '13352551690057'],
 ['2015-2-21/08/2019',
  '2 Metropolitan',
  'SALISBURY',
  '5108',
  'CITY OF SALISBURY',
  '2',
  '1',
  '0',
  '0',
  '1',
  '2015',
  'February',
  'Tuesday',
  '03:38 pm',
  '060',
  'Cross Road',
  'Straight road',
  'Level',
  'Not Applicable',
  'Sealed',
  'Dry',
  'Not Raining',
  'Daylight',
  'Rear End',
  '01',
  'Driver Rider',
  '2: MI',
  'Traffic Signals',
  '',
  '',
  '1333389.6',
  '1688248.34',
  '13333901688248'],
 ['2015-3-21/08/2019',
  '2 Metropolitan',
  'ST MARYS',
  '5042',
  'CC MITCHAM.                   ',
  '2',
  '1',

In [9]:
units_rdd.take(10)

[['2015-1-21/08/2019',
  '01',
  '0',
  'SA',
  'RIGID TRUCK LGE GE 4.5T',
  '1999',
  'North East',
  'Male',
  '052',
  'SA',
  'HRR ',
  'Full',
  'Not Towing',
  'Swerving',
  '001',
  '5109',
  '',
  ''],
 ['2015-1-21/08/2019',
  '02',
  '1',
  'SA',
  'Motor Cars - Sedan',
  '2009',
  'North East',
  'Female',
  '057',
  'SA',
  'C ',
  'Full',
  'Not Towing',
  'Straight Ahead',
  '002',
  '5125',
  '',
  ''],
 ['2015-2-21/08/2019',
  '01',
  '0',
  'SA',
  'Motor Cars - Sedan',
  '2009',
  'South East',
  'Male',
  '020',
  'SA',
  'MR',
  'Provisional 1 ',
  'Not Towing',
  'Straight Ahead',
  '001',
  '5110',
  '',
  ''],
 ['2015-2-21/08/2019',
  '02',
  '1',
  'SA',
  'Motor Cars - Sedan',
  '1994',
  'South East',
  'Female',
  '021',
  'SA',
  'C ',
  'Full',
  'Not Towing',
  'Stopped on Carriageway',
  '001',
  '5096',
  '',
  ''],
 ['2015-3-21/08/2019',
  '01',
  '0',
  'SA',
  'Motor Cars - Sedan',
  '2008',
  'North East',
  'Male',
  '023',
  'SA',
  'C ',
  'Full',


In [10]:
crashes_rdd.count()

72006

In [11]:
units_rdd.count()

153854

### 1.2

In [12]:
units_rdd.getNumPartitions()

2

In [13]:
partitions = units_rdd.glom().collect()

In [14]:
print([len(p) for p in partitions])

[76800, 77054]


In [91]:
units_kv = units_rdd.map(lambda x: (x[9], x[:9] + x[10:]))
units_kv.take(10)

[('SA',
  ['2015-1-21/08/2019',
   '01',
   '0',
   'SA',
   'RIGID TRUCK LGE GE 4.5T',
   '1999',
   'North East',
   'Male',
   '052',
   'HRR ',
   'Full',
   'Not Towing',
   'Swerving',
   '001',
   '5109',
   '',
   '']),
 ('SA',
  ['2015-1-21/08/2019',
   '02',
   '1',
   'SA',
   'Motor Cars - Sedan',
   '2009',
   'North East',
   'Female',
   '057',
   'C ',
   'Full',
   'Not Towing',
   'Straight Ahead',
   '002',
   '5125',
   '',
   '']),
 ('SA',
  ['2015-2-21/08/2019',
   '01',
   '0',
   'SA',
   'Motor Cars - Sedan',
   '2009',
   'South East',
   'Male',
   '020',
   'MR',
   'Provisional 1 ',
   'Not Towing',
   'Straight Ahead',
   '001',
   '5110',
   '',
   '']),
 ('SA',
  ['2015-2-21/08/2019',
   '02',
   '1',
   'SA',
   'Motor Cars - Sedan',
   '1994',
   'South East',
   'Female',
   '021',
   'C ',
   'Full',
   'Not Towing',
   'Stopped on Carriageway',
   '001',
   '5096',
   '',
   '']),
 ('SA',
  ['2015-3-21/08/2019',
   '01',
   '0',
   'SA',
   'Motor C

In [16]:
def lic_state_function(key):
    if(key == "SA"):
        return 0
    else:
        return 1

units_partitioned = units_kv.partitionBy(2, lic_state_function)

In [17]:
def print_partitions(rdd):
    partitions = rdd.glom().collect()
    print(f"Total partitions number : {len(partitions)}")
    for p in partitions:
        print(f"Partition number:{len(p)}")
        
print_partitions(units_partitioned)

Total partitions number : 2
Partition number:109684
Partition number:44170


### 1.3

In [18]:
units_male = units_rdd.filter(lambda x : x[7] == 'Male').filter(lambda x : x[8].isnumeric())
units_male_age = units_male.map(lambda x: int(x[8]))
units_male_age.reduce(lambda x, y: x + y)/units_male_age.count()

40.975960299920004

In [19]:
units_female = units_rdd.filter(lambda x : x[7] == 'Female').filter(lambda x : x[8].isnumeric())
units_female_age = units_female.map(lambda x: int(x[8]))
units_female_age.reduce(lambda x, y: x + y)/units_female_age.count()

40.38729268862415

In [20]:
units_sex_age = units_rdd.filter(lambda x: x[8].isnumeric()).map(lambda x : (x[7], (int(x[8]), 1)))
# (int[8] + int[8], 1 + 1)
units_sex_age.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).mapValues(lambda x : x[0]/x[1]).collect()

[('Male', 40.975960299920004),
 ('Female', 40.38729268862415),
 ('Unknown', 32.55813953488372)]

In [21]:
units_rdd.filter(lambda x : x[5].isnumeric()).sortBy(lambda x : int(x[5])).map(lambda x : (x[9], x[5], x[4])).take(1)

[('VIC', '1900', 'Motor Cycle')]

In [22]:
units_rdd.filter(lambda x : x[5].isnumeric()).sortBy(lambda x : int(x[5]), ascending = False).map(lambda x : (x[9], x[5], x[4])).take(1)

[('SA', '2019', 'Station Wagon')]

### 2

In [30]:
crashes = spark.read.csv("./*_Crash.csv", header = True, inferSchema = True)
units = spark.read.csv("./*_Units.csv", header = True, inferSchema = True)

In [31]:
crashes.printSchema()
units.printSchema()

root
 |-- REPORT_ID: string (nullable = true)
 |-- Stats Area: string (nullable = true)
 |-- Suburb: string (nullable = true)
 |-- Postcode: integer (nullable = true)
 |-- LGA Name: string (nullable = true)
 |-- Total Units: integer (nullable = true)
 |-- Total Cas: integer (nullable = true)
 |-- Total Fats: integer (nullable = true)
 |-- Total SI: integer (nullable = true)
 |-- Total MI: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Area Speed: integer (nullable = true)
 |-- Position Type: string (nullable = true)
 |-- Horizontal Align: string (nullable = true)
 |-- Vertical Align: string (nullable = true)
 |-- Other Feat: string (nullable = true)
 |-- Road Surface: string (nullable = true)
 |-- Moisture Cond: string (nullable = true)
 |-- Weather Cond: string (nullable = true)
 |-- DayNight: string (nullable = true)
 |-- Crash Type: string (nullable = true

In [32]:
units.groupby("Sex").agg(F.mean("Age")).show()

+-------+------------------+
|    Sex|          avg(Age)|
+-------+------------------+
|   null|              null|
| Female| 40.38729268862415|
|Unknown| 32.55813953488372|
|   Male|40.975960299920004|
+-------+------------------+



In [34]:
crashes.filter(F.col("Suburb") == "ADELAIDE").filter(F.col("Total Cas") > 3).show()

+--------------------+----------+--------+--------+----------------+-----------+---------+----------+--------+--------+----+--------+--------+--------+----------+-------------+----------------+--------------+--------------------+------------+-------------+------------+--------+--------------+---------+------------+-------------+---------------+------------+--------------+----------+----------+--------------+
|           REPORT_ID|Stats Area|  Suburb|Postcode|        LGA Name|Total Units|Total Cas|Total Fats|Total SI|Total MI|Year|   Month|     Day|    Time|Area Speed|Position Type|Horizontal Align|Vertical Align|          Other Feat|Road Surface|Moisture Cond|Weather Cond|DayNight|    Crash Type|Unit Resp| Entity Code|CSEF Severity|  Traffic Ctrls|DUI Involved|Drugs Involved|  ACCLOC_X|  ACCLOC_Y|    UNIQUE_LOC|
+--------------------+----------+--------+--------+----------------+-----------+---------+----------+--------+--------+----+--------+--------+--------+----------+-------------+

In [36]:
crashes.sort("Total Cas", ascending = False).show(10)

+--------------------+--------------+---------------+--------+--------------------+-----------+---------+----------+--------+--------+----+--------+---------+--------+----------+-------------+--------------------+--------------+--------------+------------+-------------+------------+--------+-----------+---------+------------+-------------+---------------+------------+--------------+----------+----------+--------------+
|           REPORT_ID|    Stats Area|         Suburb|Postcode|            LGA Name|Total Units|Total Cas|Total Fats|Total SI|Total MI|Year|   Month|      Day|    Time|Area Speed|Position Type|    Horizontal Align|Vertical Align|    Other Feat|Road Surface|Moisture Cond|Weather Cond|DayNight| Crash Type|Unit Resp| Entity Code|CSEF Severity|  Traffic Ctrls|DUI Involved|Drugs Involved|  ACCLOC_X|  ACCLOC_Y|    UNIQUE_LOC|
+--------------------+--------------+---------------+--------+--------------------+-----------+---------+----------+--------+--------+----+--------+------

In [38]:
crashes.groupby("Crash Type").sum("Total Fats").show()

+--------------------+---------------+
|          Crash Type|sum(Total Fats)|
+--------------------+---------------+
|           Roll Over|             57|
|  Hit Object on Road|              2|
|      Hit Pedestrian|             70|
|    Hit Fixed Object|            152|
|               Other|              2|
|          Side Swipe|             20|
|             Head On|             86|
|  Hit Parked Vehicle|              9|
|          Right Turn|             18|
|            Rear End|             16|
|          Hit Animal|              4|
|Left Road - Out o...|              1|
|         Right Angle|             45|
+--------------------+---------------+



In [40]:
units.groupby("Licence Type").count().show()

+--------------+-----+
|  Licence Type|count|
+--------------+-----+
|          null|28698|
|    Unlicenced| 2282|
|   Conditional|   22|
|  Disqualified|  192|
|       Unknown|13466|
|      Learners| 1525|
|  Probationary|  689|
|          Full|94655|
| Provisional 2| 6079|
|Provisional 1 | 6246|
+--------------+-----+



In [43]:
reports = units.filter(F.col("Licence Type") == "Unlicenced").join(crashes, units.REPORT_ID == crashes.REPORT_ID)
reports.groupby("Suburb").agg(F.sum("Total Cas")).show()

+-------------------+--------------+
|             Suburb|sum(Total Cas)|
+-------------------+--------------+
|      FLINDERS PARK|             8|
|       POOGINAGORIC|             1|
|     TEA TREE GULLY|             1|
|            HACKHAM|             3|
|   MEDINDIE GARDENS|             0|
|           WISANGER|             1|
|            CUMMINS|             0|
|       BASKET RANGE|             0|
|MURRAY BRIDGE SOUTH|             0|
|      GILLES PLAINS|             7|
|             HAWKER|             0|
|           BEAUFORT|             1|
|             MAGILL|             7|
|            ECHUNGA|             1|
|            CULTANA|             1|
|        EDWARDSTOWN|             6|
|        RISDON PARK|             1|
|          THORNGATE|             1|
|       ANDREWS FARM|            12|
|       TORRENSVILLE|             5|
+-------------------+--------------+
only showing top 20 rows



In [66]:
drug_involved = crashes.filter(F.col("Drugs Involved") == "Y")
count_by_CSEF = drug_involved.groupby(F.col("CSEF Severity")).count()
total = drug_involved.count()
count_by_CSEF.withColumn("percentage", F.col("count") * 100 /total).show()

+-------------+-----+------------------+
|CSEF Severity|count|        percentage|
+-------------+-----+------------------+
|     4: Fatal|   82| 6.539074960127592|
|        2: MI|  749|59.728867623604465|
|       1: PDO|  176|14.035087719298245|
|        3: SI|  247|19.696969696969695|
+-------------+-----+------------------+



In [68]:
drug__DUI_involved = crashes.filter((F.col("Drugs Involved") == "Y") & (F.col("DUI Involved") == "Y"))
count = drug__DUI_involved.groupby(F.col("CSEF Severity")).count()
total = drug__DUI_involved.count()
count.withColumn("percentage", F.col("count") * 100 /total).show()

+-------------+-----+------------------+
|CSEF Severity|count|        percentage|
+-------------+-----+------------------+
|     4: Fatal|   27|15.428571428571429|
|        2: MI|   89|50.857142857142854|
|       1: PDO|   24|13.714285714285714|
|        3: SI|   35|              20.0|
+-------------+-----+------------------+



In [70]:
null_involved = crashes.filter((F.col("Drugs Involved").isNull()) & (F.col("DUI Involved").isNull()))
count = null_involved.groupby(F.col("CSEF Severity")).count()
total = null_involved.count()
count.withColumn("percentage", F.col("count") * 100 /total).show()

+-------------+-----+------------------+
|CSEF Severity|count|        percentage|
+-------------+-----+------------------+
|     4: Fatal|  317|0.4615675825215859|
|        2: MI|20484|29.825710916000524|
|       1: PDO|45371| 66.06240626683557|
|        3: SI| 2507|3.6503152346423215|
+-------------+-----+------------------+



In [75]:
%%time
data1 = crashes.filter(F.col("suburb") == "ADELAIDE").join(units, units.REPORT_ID == crashes.REPORT_ID)\
.withColumn("Date", F.concat(F.col("Year"), F.lit("-"), F.col("Month"), F.lit("-"), F.col("Day")))\
.select("Date", "Time", "Total CAS", "Sex", "Age", "Licence Type").collect()

CPU times: user 42.5 ms, sys: 2.63 ms, total: 45.1 ms
Wall time: 1.58 s


[Row(Date='2016-November-Wednesday', Time='01:45 pm', Total CAS=1, Sex='Male', Age='056', Licence Type='Full'),
 Row(Date='2016-November-Wednesday', Time='01:45 pm', Total CAS=1, Sex='Male', Age='072', Licence Type=None),
 Row(Date='2016-November-Tuesday', Time='03:40 pm', Total CAS=1, Sex='Male', Age='056', Licence Type=None),
 Row(Date='2016-November-Tuesday', Time='03:40 pm', Total CAS=1, Sex='Female', Age='027', Licence Type=None),
 Row(Date='2016-November-Tuesday', Time='05:00 pm', Total CAS=0, Sex='Female', Age='032', Licence Type='Full'),
 Row(Date='2016-November-Tuesday', Time='05:00 pm', Total CAS=0, Sex='Unknown', Age='XXX', Licence Type='Unknown'),
 Row(Date='2016-November-Tuesday', Time='05:40 pm', Total CAS=0, Sex='Male', Age='022', Licence Type='Unknown'),
 Row(Date='2016-November-Tuesday', Time='05:40 pm', Total CAS=0, Sex='Male', Age='020', Licence Type='Unknown'),
 Row(Date='2016-November-Monday', Time='11:26 pm', Total CAS=0, Sex='Unknown', Age='XXX', Licence Type='Un

In [79]:
%%time
crashes_kv = crashes_rdd.filter(lambda x: x[2] == "ADELAIDE").map(lambda x: (x[0], x[1:]))
units_kv = units_rdd.map(lambda x: (x[0], x[1:]))
res_2 = crashes_kv.join(units_kv).mapValues(lambda x: [f"{x[0][9]}-{x[0][10]}-{x[0][11]}", x[0][12], x[0][5], x[1][9]])

CPU times: user 18.5 ms, sys: 4.59 ms, total: 23.1 ms
Wall time: 69.4 ms


In [80]:
crashes.createOrReplaceTempView("c_sql")
units.createOrReplaceTempView("u_sql")

In [85]:
%%time
res_3 = spark.sql('''
SELECT CONCAT(c.year,'-',c.month,'-',c.day) AS Date, c.Time, `Total Cas`, Sex, Age, `Licence Type`
FROM c_sql as c left join u_sql as u
WHERE c.REPORT_ID == u.REPORT_ID AND c.Suburb == "ADELAIDE"
''').collect()

CPU times: user 27.3 ms, sys: 5.4 ms, total: 32.7 ms
Wall time: 1.55 s


In [89]:
%%time
res_1 = units.filter(F.col("Licence Type") == "Unlicenced").join(crashes, crashes.REPORT_ID == units.REPORT_ID)\
.groupby("suburb").sum("Total Cas").show()

+-------------------+--------------+
|             suburb|sum(Total Cas)|
+-------------------+--------------+
|      FLINDERS PARK|             8|
|       POOGINAGORIC|             1|
|     TEA TREE GULLY|             1|
|            HACKHAM|             3|
|   MEDINDIE GARDENS|             0|
|           WISANGER|             1|
|            CUMMINS|             0|
|       BASKET RANGE|             0|
|MURRAY BRIDGE SOUTH|             0|
|      GILLES PLAINS|             7|
|             HAWKER|             0|
|           BEAUFORT|             1|
|             MAGILL|             7|
|            ECHUNGA|             1|
|            CULTANA|             1|
|        EDWARDSTOWN|             6|
|        RISDON PARK|             1|
|          THORNGATE|             1|
|       ANDREWS FARM|            12|
|       TORRENSVILLE|             5|
+-------------------+--------------+
only showing top 20 rows

CPU times: user 6.15 ms, sys: 0 ns, total: 6.15 ms
Wall time: 1.51 s


In [90]:
%%time
spark.sql('''
SELECT Suburb, sum(`Total Cas`)
FROM c_sql as c join u_sql as u
WHERE c.REPORT_ID == u.REPORT_ID AND `Licence Type` == "Unlicenced"
GROUP BY Suburb
''').show()

+-------------------+--------------+
|             Suburb|sum(Total Cas)|
+-------------------+--------------+
|      FLINDERS PARK|             8|
|       POOGINAGORIC|             1|
|     TEA TREE GULLY|             1|
|            HACKHAM|             3|
|   MEDINDIE GARDENS|             0|
|           WISANGER|             1|
|            CUMMINS|             0|
|       BASKET RANGE|             0|
|MURRAY BRIDGE SOUTH|             0|
|      GILLES PLAINS|             7|
|             HAWKER|             0|
|           BEAUFORT|             1|
|             MAGILL|             7|
|            ECHUNGA|             1|
|            CULTANA|             1|
|        EDWARDSTOWN|             6|
|        RISDON PARK|             1|
|          THORNGATE|             1|
|       ANDREWS FARM|            12|
|       TORRENSVILLE|             5|
+-------------------+--------------+
only showing top 20 rows

CPU times: user 2.87 ms, sys: 733 Âµs, total: 3.6 ms
Wall time: 1.43 s
