In [14]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
import pandas as pd
from pathlib import Path
import numpy as np

conf = SparkConf()
conf.set("spark.executor.memory", "40g")
conf.set("spark.executor.cores", "2")

spark = SparkSession.builder \
        .appName("spark-app") \
        .config(conf=conf) \
        .master("spark://spark-master:7077") \
        .getOrCreate()

In [2]:
# Download Data
from pathlib import Path
import os

data_dir = Path(os.path.abspath("")).parent / "data"
data_folder_name = "anz_road_crash/anz_crash_20200903_fix/anz_crash_20200903_fix"
filenames = iter([
    "Casualties.csv",
    "Crash2.csv",
    "DateTime.csv",
    "Description.csv",
    "Location2.csv",
    "Vehicles.csv"
])

In [4]:
# Create temp tables for each files
for file in filenames:
    data_path = data_dir / data_folder_name / file
    df = spark.read.csv(str(data_path), header=True)
    df.createOrReplaceTempView(file.replace(".csv", ""))

In [8]:
from pyspark import SQLContext

sqlContext = SQLContext(sparkContext=spark.sparkContext)
tables = sqlContext.tableNames()
print(tables)

['casualties', 'crash2', 'datetime', 'description', 'location2', 'vehicles']


In [10]:
results = spark.sql(
"""
select speed_limit, count(speed_limit) as number
from description
group by speed_limit
"""
)

In [48]:
results.replace(r"/[0-9]*.0/", "15").show(300)
# take note of these, make the values convertible to a float

                                                                                

+--------------+------+
|   speed_limit|number|
+--------------+------+
|            15|    11|
|       70 km/h| 20035|
|          50.0|412706|
|          90.0|   250|
|          20.0|  1694|
|            30|   269|
|          15.0|     5|
|           888|   838|
|100 - 110 km/h| 55585|
|           110|  6881|
|          null|     0|
|         110.0|     5|
|             5|     3|
|           100| 36336|
|            70| 17910|
|           999|  9615|
|            75|    61|
|          10.0|   470|
|       60 km/h|170121|
|  80 - 90 km/h| 30194|
|          70.0| 21554|
|            60|118725|
|            90|  2794|
|           5.0|     2|
|         100.0|185463|
|            40|  9888|
|            25|   437|
|   0 - 50 km/h| 52308|
|           777|   211|
|          60.0| 16210|
|          30.0|  5181|
|            20|    46|
|            10|    33|
|          80.0| 29926|
|            80| 35741|
|            50| 64698|
|          40.0|   845|
+--------------+------+



In [60]:
df_pandas = spark.sql(
"""
select *
from description
"""
).toPandas()
df_pandas = df_pandas.drop(columns=df_pandas.columns[0], axis=0)

                                                                                

In [61]:
df_pandas.head()

Unnamed: 0,description_id,severity,speed_limit,midblock,intersection,road_position_horizontal,road_position_vertical,road_sealed,road_wet,weather,crash_type,lighting,traffic_controls,drugs_alcohol,DCA_code,comment
0,0,property_damage,60,True,False,straight,level,True,False,fine,Right Angle,daylight,none,,,
1,1,property_damage,40,True,False,straight,level,True,False,fine,Hit Parked Vehicle,daylight,none,,,
2,2,property_damage,100,True,False,straight,slope,True,False,fine,Other,daylight,none,,,
3,3,property_damage,60,False,True,straight,level,True,False,fine,Rear End,daylight,stop_sign,,,
4,4,property_damage,60,False,True,straight,level,True,False,fine,Rear End,daylight,traffic_lights,,,


In [72]:
# fill = df_pandas.fillna(0)
# fill[fill["speed_limit"].apply(lambda x: isinstance(x, (str)))]
fill[pd.to_numeric(fill["speed_limit"], errors='coerce').notnull()]

Unnamed: 0,description_id,severity,speed_limit,midblock,intersection,road_position_horizontal,road_position_vertical,road_sealed,road_wet,weather,crash_type,lighting,traffic_controls,drugs_alcohol,DCA_code,comment
0,0,property_damage,60,True,False,straight,level,True,False,fine,Right Angle,daylight,none,0,0,0
1,1,property_damage,40,True,False,straight,level,True,False,fine,Hit Parked Vehicle,daylight,none,0,0,0
2,2,property_damage,100,True,False,straight,slope,True,False,fine,Other,daylight,none,0,0,0
3,3,property_damage,60,False,True,straight,level,True,False,fine,Rear End,daylight,stop_sign,0,0,0
4,4,property_damage,60,False,True,straight,level,True,False,fine,Rear End,daylight,traffic_lights,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1519450,1519450,serious_injury,0,True,False,0,0,True,False,fine,0,darkness_lit,0,0,0,0
1519451,1519451,property_damage,0,False,True,0,0,True,False,fine,0,daylight,0,0,0,0
1519452,1519452,property_damage,0,False,True,0,0,True,False,fine,0,daylight,0,0,0,0
1519453,1519453,property_damage,0,True,False,0,0,True,False,fine,0,daylight,0,0,0,0


AttributeError: 'NoneType' object has no attribute 'isnumeric'

In [3]:
data_path = data_dir / data_folder_name / next(filenames)
print(data_path)
df_casualties = spark.read.csv(str(data_path), header=True)
df_casualties.show()

                                                                                

+----+-------------+----------+----------+----------------+--------------+
| _c0|casualties_id|casualties|fatalities|serious_injuries|minor_injuries|
+----+-------------+----------+----------+----------------+--------------+
|   0|           0c|       0.0|       0.0|             0.0|           0.0|
|  16|         1c1m|       1.0|       0.0|             0.0|           1.0|
|  57|         2c2m|       2.0|       0.0|             0.0|           2.0|
| 270|         1c1s|       1.0|       0.0|             1.0|           0.0|
| 573|         1c1f|       1.0|       1.0|             0.0|           0.0|
| 649|         4c4m|       4.0|       0.0|             0.0|           4.0|
| 670|       2c1s1m|       2.0|       0.0|             1.0|           1.0|
| 882|       3c1f2s|       3.0|       1.0|             2.0|           0.0|
| 889|       8c1f7s|       8.0|       1.0|             7.0|           0.0|
| 956|         3c3m|       3.0|       0.0|             0.0|           3.0|
|1413|         2c2s|     

In [5]:
print(df_casualties.columns)
print(df_casualties.schema)
print(df_casualties.count())

['_c0', 'casualties_id', 'casualties', 'fatalities', 'serious_injuries', 'minor_injuries']
StructType(List(StructField(_c0,StringType,true),StructField(casualties_id,StringType,true),StructField(casualties,StringType,true),StructField(fatalities,StringType,true),StructField(serious_injuries,StringType,true),StructField(minor_injuries,StringType,true)))
259


In [6]:
data_path = data_dir / data_folder_name / next(filenames)
print(data_path)

/opt/workspace/data/anz_road_crash/anz_crash_20200903_fix/anz_crash_20200903_fix/Crash2.csv


In [7]:
df_crash = spark.read.csv(str(data_path), header=True)
df_crash.show()

+--------------------+--------------------+------------+--------------+-----------+-------------+
|            crash_id|            lat_long|date_time_id|description_id|vehicles_id|casualties_id|
+--------------------+--------------------+------------+--------------+-----------+-------------+
| SA2012-1-21/08/2019|(-34.914968707994...|2012-1--7-16|             0|       1c1b|           0c|
| SA2012-2-21/08/2019|(-34.945411892314...| 2012-1--7-9|             1|         2c|           0c|
| SA2012-3-21/08/2019|(-35.348782706688...|2012-1--3-11|             2|       2c1i|           0c|
| SA2012-4-21/08/2019|(-34.910890136635...|2012-1--3-10|             3|         2w|           0c|
| SA2012-5-21/08/2019|(-34.905235082470...|2012-1--3-15|             4|       1c1w|           0c|
| SA2012-6-21/08/2019|(-34.978174108493...|2012-1--5-11|             5|         4c|           0c|
| SA2012-7-21/08/2019|(-34.902922403654...|2012-1--4-13|             6|         2c|           0c|
| SA2012-8-21/08/201

In [8]:
df_crash.count()

                                                                                

1519455

In [9]:
data_path = data_dir / data_folder_name / next(filenames)
print(data_path)

/opt/workspace/data/anz_road_crash/anz_crash_20200903_fix/anz_crash_20200903_fix/DateTime.csv


In [10]:
df_datetime = spark.read.csv(str(data_path), header=True)
df_datetime.show()

+---+------------+----+-----+-----------+------------+----+-----------+
|_c0|date_time_id|year|month|day_of_week|day_of_month|hour|approximate|
+---+------------+----+-----+-----------+------------+----+-----------+
|  0|2012-1--7-16|2012|  1.0|        7.0|        null|16.0|       True|
|  1| 2012-1--7-9|2012|  1.0|        7.0|        null| 9.0|       True|
|  2|2012-1--3-11|2012|  1.0|        3.0|        null|11.0|       True|
|  3|2012-1--3-10|2012|  1.0|        3.0|        null|10.0|       True|
|  4|2012-1--3-15|2012|  1.0|        3.0|        null|15.0|       True|
|  5|2012-1--5-11|2012|  1.0|        5.0|        null|11.0|       True|
|  6|2012-1--4-13|2012|  1.0|        4.0|        null|13.0|       True|
|  7|2012-1--5-16|2012|  1.0|        5.0|        null|16.0|       True|
|  8|2012-1--6-10|2012|  1.0|        6.0|        null|10.0|       True|
|  9|2012-1--6-15|2012|  1.0|        6.0|        null|15.0|       True|
| 10|2012-1--1-13|2012|  1.0|        1.0|        null|13.0|     

In [11]:
df_datetime.count()

127505

In [12]:
data_path = data_dir / data_folder_name / next(filenames)
print(data_path)

/opt/workspace/data/anz_road_crash/anz_crash_20200903_fix/anz_crash_20200903_fix/Description.csv


In [13]:
df_description = spark.read.csv(str(data_path), header=True)
df_description.show()

+---+--------------+---------------+-----------+--------+------------+------------------------+----------------------+-----------+--------+-------+------------------+--------+----------------+-------------+--------+-------+
|_c0|description_id|       severity|speed_limit|midblock|intersection|road_position_horizontal|road_position_vertical|road_sealed|road_wet|weather|        crash_type|lighting|traffic_controls|drugs_alcohol|DCA_code|comment|
+---+--------------+---------------+-----------+--------+------------+------------------------+----------------------+-----------+--------+-------+------------------+--------+----------------+-------------+--------+-------+
|  0|             0|property_damage|         60|    True|       False|                straight|                 level|       True|   False|   fine|       Right Angle|daylight|            none|         null|    null|   null|
|  1|             1|property_damage|         40|    True|       False|                straight|         

In [24]:
print(df_description.count())

df_description.createOrReplaceTempView("descriptions")
results = spark.sql(
"""
select description_id, drugs_alcohol
from descriptions
where drugs_alcohol is not null
-- limit 100
"""
)
results.take(50)

1519455


[Row(description_id='575', drugs_alcohol='Y'),
 Row(description_id='602', drugs_alcohol='Y'),
 Row(description_id='603', drugs_alcohol='Y'),
 Row(description_id='605', drugs_alcohol='Y'),
 Row(description_id='612', drugs_alcohol='Y'),
 Row(description_id='618', drugs_alcohol='Y'),
 Row(description_id='627', drugs_alcohol='Y'),
 Row(description_id='628', drugs_alcohol='Y'),
 Row(description_id='634', drugs_alcohol='Y'),
 Row(description_id='655', drugs_alcohol='Y'),
 Row(description_id='686', drugs_alcohol='Y'),
 Row(description_id='691', drugs_alcohol='Y'),
 Row(description_id='731', drugs_alcohol='Y'),
 Row(description_id='791', drugs_alcohol='Y'),
 Row(description_id='822', drugs_alcohol='Y'),
 Row(description_id='941', drugs_alcohol='Y'),
 Row(description_id='966', drugs_alcohol='Y'),
 Row(description_id='968', drugs_alcohol='Y'),
 Row(description_id='983', drugs_alcohol='Y'),
 Row(description_id='985', drugs_alcohol='Y'),
 Row(description_id='986', drugs_alcohol='Y'),
 Row(descript

In [18]:
data_path = data_dir / data_folder_name / next(filenames)
print(data_path)

/opt/workspace/data/anz_road_crash/anz_crash_20200903_fix/anz_crash_20200903_fix/Location2.csv


In [19]:
df_location = spark.read.csv(str(data_path), header=True)
df_location.show()

+--------------------+-------------------+------------------+-------+-----+---------------------+----------------+---------------+
|            lat_long|           latitude|         longitude|country|state|local_government_area|statistical_area|         suburb|
+--------------------+-------------------+------------------+-------+-----+---------------------+----------------+---------------+
|(-34.914968707994...|-34.914968707994774|138.62326191400015|     AU|   SA| CC OF NORWOOD,PAY...|            null|        STEPNEY|
|(-34.945411892314...|-34.945411892314496| 138.6106907387375|     AU|   SA|        CITY OF UNLEY|            null|       PARKSIDE|
|(-35.348782706688...|-35.348782706688546| 138.4547384995269|     AU|   SA|  CITY OF ONKAPARINGA|            null| SELLICKS BEACH|
|(-34.910890136635...| -34.91089013663556|138.56464045685533|     AU|   SA| CITY OF CHARLES S...|            null|      HINDMARSH|
|(-34.905235082470...| -34.90523508247097|138.57241145966046|     AU|   SA| CITY OF

In [18]:
df_location.count()

918263

In [20]:
data_path = data_dir / data_folder_name / next(filenames)
print(data_path)

/opt/workspace/data/anz_road_crash/anz_crash_20200903_fix/anz_crash_20200903_fix/Vehicles.csv


In [21]:
df_vehicle = spark.read.csv(str(data_path), header=True)
df_vehicle.show()

+---+-----------+-------+---------+-----------+-------+-------+-----------------+-----------+-----------+-----------+---+----+-------+-------+----------+---------+-----+----+-------------+
|_c0|vehicles_id|animals|car_sedan|car_utility|car_van|car_4x4|car_station_wagon|motor_cycle|truck_small|truck_large|bus|taxi|bicycle|scooter|pedestrian|inanimate|train|tram|vehicle_other|
+---+-----------+-------+---------+-----------+-------+-------+-----------------+-----------+-----------+-----------+---+----+-------+-------+----------+---------+-----+----+-------------+
|  0|       1c1b|    0.0|      1.0|        0.0|    0.0|    0.0|              0.0|        0.0|        0.0|        0.0|0.0| 0.0|    1.0|    0.0|       0.0|      0.0|  0.0| 0.0|          0.0|
|  1|         2c|    0.0|      2.0|        0.0|    0.0|    0.0|              0.0|        0.0|        0.0|        0.0|0.0| 0.0|    0.0|    0.0|       0.0|      0.0|  0.0| 0.0|          0.0|
|  2|       2c1i|    0.0|      2.0|        0.0|    0.0|

In [21]:
df_vehicle.count()

2434

In [22]:
df_casualties.createOrReplaceTempView("Casualties")
df_crash.createOrReplaceTempView("crash")
df_datetime.createOrReplaceTempView("Datetime")
df_description.createOrReplaceTempView("Description")
df_location.createOrReplaceTempView("Location")
df_vehicle.createOrReplaceTempView("Vehicle")

In [47]:
data_path = Path(os.path.abspath("")).parent / "data" / "anz_road_crash" / "anz_crash_20200903_fix/anz_crash_20200903_fix" / "Description.csv"
df = pd.read_csv(data_path)
df = df.drop(columns=df.columns[0], index=0)
df.head()

  df = pd.read_csv(data_path)


Unnamed: 0,description_id,severity,speed_limit,midblock,intersection,road_position_horizontal,road_position_vertical,road_sealed,road_wet,weather,crash_type,lighting,traffic_controls,drugs_alcohol,DCA_code,comment
1,1,property_damage,40,True,False,straight,level,True,False,fine,Hit Parked Vehicle,daylight,none,,,
2,2,property_damage,100,True,False,straight,slope,True,False,fine,Other,daylight,none,,,
3,3,property_damage,60,False,True,straight,level,True,False,fine,Rear End,daylight,stop_sign,,,
4,4,property_damage,60,False,True,straight,level,True,False,fine,Rear End,daylight,traffic_lights,,,
5,5,property_damage,60,False,True,straight,level,True,False,fine,Rear End,daylight,traffic_lights,,,


In [56]:
speed_limit = df["speed_limit"].astype('string')
print(speed_limit)

1            40
2           100
3            60
4            60
5            60
           ... 
1519450    <NA>
1519451    <NA>
1519452    <NA>
1519453    <NA>
1519454    <NA>
Name: speed_limit, Length: 1519454, dtype: string


In [59]:
res = speed_limit.filter(like='-')
print(res)

Series([], Name: speed_limit, dtype: string)


In [27]:
df_crash.schema

StructType(List(StructField(crash_id,StringType,true),StructField(lat_long,StringType,true),StructField(date_time_id,StringType,true),StructField(description_id,StringType,true),StructField(vehicles_id,StringType,true),StructField(casualties_id,StringType,true)))

In [28]:
df_datetime.schema

StructType(List(StructField(_c0,StringType,true),StructField(date_time_id,StringType,true),StructField(year,StringType,true),StructField(month,StringType,true),StructField(day_of_week,StringType,true),StructField(day_of_month,StringType,true),StructField(hour,StringType,true),StructField(approximate,StringType,true)))

In [29]:
df_description.schema

StructType(List(StructField(_c0,StringType,true),StructField(description_id,StringType,true),StructField(severity,StringType,true),StructField(speed_limit,StringType,true),StructField(midblock,StringType,true),StructField(intersection,StringType,true),StructField(road_position_horizontal,StringType,true),StructField(road_position_vertical,StringType,true),StructField(road_sealed,StringType,true),StructField(road_wet,StringType,true),StructField(weather,StringType,true),StructField(crash_type,StringType,true),StructField(lighting,StringType,true),StructField(traffic_controls,StringType,true),StructField(drugs_alcohol,StringType,true),StructField(DCA_code,StringType,true),StructField(comment,StringType,true)))

In [30]:
df_location.schema

StructType(List(StructField(lat_long,StringType,true),StructField(latitude,StringType,true),StructField(longitude,StringType,true),StructField(country,StringType,true),StructField(state,StringType,true),StructField(local_government_area,StringType,true),StructField(statistical_area,StringType,true),StructField(suburb,StringType,true)))

In [31]:
df_vehicle.schema

StructType(List(StructField(_c0,StringType,true),StructField(vehicles_id,StringType,true),StructField(animals,StringType,true),StructField(car_sedan,StringType,true),StructField(car_utility,StringType,true),StructField(car_van,StringType,true),StructField(car_4x4,StringType,true),StructField(car_station_wagon,StringType,true),StructField(motor_cycle,StringType,true),StructField(truck_small,StringType,true),StructField(truck_large,StringType,true),StructField(bus,StringType,true),StructField(taxi,StringType,true),StructField(bicycle,StringType,true),StructField(scooter,StringType,true),StructField(pedestrian,StringType,true),StructField(inanimate,StringType,true),StructField(train,StringType,true),StructField(tram,StringType,true),StructField(vehicle_other,StringType,true)))

In [None]:
schema_casualty = 