In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('car-accidents').getOrCreate()

In [2]:
dfAccidents1 = spark.read.csv('Datasets/Accidents0514-part1.csv', inferSchema=True, header=True)
dfAccidents2 = spark.read.csv('Datasets/Accidents0514-part2.csv', inferSchema=True, header=True)
dfAccidents3 = spark.read.csv('Datasets/Accidents0514-part3.csv', inferSchema=True, header=True)

dfVehicles1 = spark.read.csv('Datasets/Vehicles0514-part1.csv', inferSchema=True, header=True)
dfVehicles2 = spark.read.csv('Datasets/Vehicles0514-part2.csv', inferSchema=True, header=True)


dfAccidents = dfAccidents1.union(dfAccidents2)
dfAccidents = dfAccidents.union(dfAccidents3)
dfVehicles = dfVehicles1.union(dfVehicles2)
dfCasualties = spark.read.csv('Datasets/Casualties0514.csv', inferSchema=True, header=True)

del dfAccidents1
del dfAccidents2
del dfAccidents3
del dfVehicles1
del dfVehicles2

In [3]:
print(dfAccidents.count())
print(dfVehicles.count())
print(dfCasualties.count())

1640597
3004425
2216720


In [4]:
from pyspark.sql.functions import isnan, when, count, col

missingCounts = dfAccidents.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dfAccidents.columns])
missingCounts.show()

+--------------+---------------------+----------------------+---------+--------+------------+-----------------+------------------+--------------------+----+-----------+----+--------------------------+-------------------------+--------------+---------------+---------+-----------+---------------+----------------+--------------+---------------+---------------------------------+---------------------------------------+----------------+------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+-------------------------+
|Accident_Index|Location_Easting_OSGR|Location_Northing_OSGR|Longitude|Latitude|Police_Force|Accident_Severity|Number_of_Vehicles|Number_of_Casualties|Date|Day_of_Week|Time|Local_Authority_(District)|Local_Authority_(Highway)|1st_Road_Class|1st_Road_Number|Road_Type|Speed_limit|Junction_Detail|Junction_Control|2nd_Road_Class|2nd_Road_Number|Pedestrian_Crossing-Human_Control|Pedestri

In [5]:
missingCountsPD = missingCounts.toPandas()
print(missingCountsPD.T)


                                                  0
Accident_Index                                    0
Location_Easting_OSGR                           111
Location_Northing_OSGR                          111
Longitude                                       111
Latitude                                        111
Police_Force                                      0
Accident_Severity                                 0
Number_of_Vehicles                                0
Number_of_Casualties                              0
Date                                              0
Day_of_Week                                       0
Time                                            133
Local_Authority_(District)                        0
Local_Authority_(Highway)                         0
1st_Road_Class                                    0
1st_Road_Number                                   0
Road_Type                                         0
Speed_limit                                       0
Junction_Det

In [6]:
#Select our data

dfSubset = dfAccidents.select("Speed_limit","Road_type", "Light_Conditions", "Weather_Conditions", "Road_Surface_Conditions", "Number_of_Casualties")
dfSubset.show()

+-----------+---------+----------------+------------------+-----------------------+--------------------+
|Speed_limit|Road_type|Light_Conditions|Weather_Conditions|Road_Surface_Conditions|Number_of_Casualties|
+-----------+---------+----------------+------------------+-----------------------+--------------------+
|         30|        6|               1|                 2|                      2|                   1|
|         30|        3|               4|                 1|                      1|                   1|
|         30|        6|               4|                 1|                      1|                   1|
|         30|        6|               1|                 1|                      1|                   1|
|         30|        6|               7|                 1|                      2|                   1|
|         30|        6|               1|                 2|                      2|                   1|
|         30|        6|               4|               

In [7]:
#remove rows with missing rows

dfRemoveMissing = dfAccidents.na.drop()


In [8]:
originalCount = dfAccidents.count()
newCount = dfRemoveMissing.count()

print(originalCount)
print(newCount)
print(originalCount - newCount)

#Print missing values
missingCounts = dfRemoveMissing.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dfRemoveMissing.columns])
missingCountsPD = missingCounts.toPandas()
print(missingCountsPD.T)

1640597
1519998
120599
                                             0
Accident_Index                               0
Location_Easting_OSGR                        0
Location_Northing_OSGR                       0
Longitude                                    0
Latitude                                     0
Police_Force                                 0
Accident_Severity                            0
Number_of_Vehicles                           0
Number_of_Casualties                         0
Date                                         0
Day_of_Week                                  0
Time                                         0
Local_Authority_(District)                   0
Local_Authority_(Highway)                    0
1st_Road_Class                               0
1st_Road_Number                              0
Road_Type                                    0
Speed_limit                                  0
Junction_Detail                              0
Junction_Control                     

In [9]:
del missingCounts, missingCountsPD, dfRemoveMissing


In [10]:
dfAccidents = dfAccidents.na.drop()
dfCasualties = dfCasualties.na.drop()

dfMerged = dfCasualties.join(dfAccidents,['Accident_Index'], how = 'full').orderBy('Accident_Index').na.drop()


print(dfMerged.columns)
print("\naccidents record count :", dfAccidents.count())
print("casualties record count :", dfCasualties.count())
print("merged record count :", dfMerged.count())

print("\naccidents column count :", len(dfAccidents.columns))
print("casualties column count :", len(dfCasualties.columns))
print("merged column count :", len(dfMerged.columns))


['Accident_Index', 'Vehicle_Reference', 'Casualty_Reference', 'Casualty_Class', 'Sex_of_Casualty', 'Age_of_Casualty', 'Age_Band_of_Casualty', 'Casualty_Severity', 'Pedestrian_Location', 'Pedestrian_Movement', 'Car_Passenger', 'Bus_or_Coach_Passenger', 'Pedestrian_Road_Maintenance_Worker', 'Casualty_Type', 'Casualty_Home_Area_Type', 'Location_Easting_OSGR', 'Location_Northing_OSGR', 'Longitude', 'Latitude', 'Police_Force', 'Accident_Severity', 'Number_of_Vehicles', 'Number_of_Casualties', 'Date', 'Day_of_Week', 'Time', 'Local_Authority_(District)', 'Local_Authority_(Highway)', '1st_Road_Class', '1st_Road_Number', 'Road_Type', 'Speed_limit', 'Junction_Detail', 'Junction_Control', '2nd_Road_Class', '2nd_Road_Number', 'Pedestrian_Crossing-Human_Control', 'Pedestrian_Crossing-Physical_Facilities', 'Light_Conditions', 'Weather_Conditions', 'Road_Surface_Conditions', 'Special_Conditions_at_Site', 'Carriageway_Hazards', 'Urban_or_Rural_Area', 'Did_Police_Officer_Attend_Scene_of_Accident', 'LSO

In [11]:
dfMerged.show()

+--------------+-----------------+------------------+--------------+---------------+---------------+--------------------+-----------------+-------------------+-------------------+-------------+----------------------+----------------------------------+-------------+-----------------------+---------------------+----------------------+--------------------+---------+------------+-----------------+------------------+--------------------+----------+-----------+-----+--------------------------+-------------------------+--------------+---------------+---------+-----------+---------------+----------------+--------------+---------------+---------------------------------+---------------------------------------+----------------+------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+-------------------------+
|Accident_Index|Vehicle_Reference|Casualty_Reference|Casualty_Class|Sex_of_Casualty|Age_of_Ca

In [12]:
dfSorted = dfMerged.orderBy(col('Speed_limit').asc())
dfSorted.select('Speed_limit').show()

+-----------+
|Speed_limit|
+-----------+
|         10|
|         10|
|         10|
|         10|
|         10|
|         15|
|         15|
|         15|
|         20|
|         20|
|         20|
|         20|
|         20|
|         20|
|         20|
|         20|
|         20|
|         20|
|         20|
|         20|
+-----------+
only showing top 20 rows



In [13]:
dfSorted = dfMerged.orderBy(col('Speed_limit').desc())
dfSorted.select('Speed_limit').show()

+-----------+
|Speed_limit|
+-----------+
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
+-----------+
only showing top 20 rows



In [14]:
dfAccidents.createOrReplaceTempView('dfAccidents')
results = spark.sql("SELECT Speed_limit FROM dfAccidents WHERE Speed_limit >= 70")
results.show()

+-----------+
|Speed_limit|
+-----------+
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
|         70|
+-----------+
only showing top 20 rows



In [15]:
dfAccidents.withColumn('reckless',
                       ((dfAccidents['Speed_limit'] >= 70) & 
                       (dfAccidents['Light_Conditions'] != 1) & 
                       (dfAccidents['Weather_Conditions'] != 1))).select('Speed_limit', 'Light_Conditions', 'Weather_Conditions', 'reckless').orderBy(col('Speed_limit').desc()).show(25)

+-----------+----------------+------------------+--------+
|Speed_limit|Light_Conditions|Weather_Conditions|reckless|
+-----------+----------------+------------------+--------+
|         70|               1|                 1|   false|
|         70|               4|                 1|   false|
|         70|               1|                 1|   false|
|         70|               1|                 2|   false|
|         70|               1|                 1|   false|
|         70|               1|                 2|   false|
|         70|               1|                 1|   false|
|         70|               1|                 1|   false|
|         70|               1|                 2|   false|
|         70|               1|                 1|   false|
|         70|               1|                 1|   false|
|         70|               4|                 1|   false|
|         70|               1|                 1|   false|
|         70|               1|                 1|   fals

In [16]:
dfSubset.select("Number_of_Casualties").describe().show()

+-------+--------------------+
|summary|Number_of_Casualties|
+-------+--------------------+
|  count|             1640597|
|   mean|    1.35116667895894|
| stddev|  0.8260983324873732|
|    min|                   1|
|    max|                  93|
+-------+--------------------+



In [17]:
dfSubset.show()

+-----------+---------+----------------+------------------+-----------------------+--------------------+
|Speed_limit|Road_type|Light_Conditions|Weather_Conditions|Road_Surface_Conditions|Number_of_Casualties|
+-----------+---------+----------------+------------------+-----------------------+--------------------+
|         30|        6|               1|                 2|                      2|                   1|
|         30|        3|               4|                 1|                      1|                   1|
|         30|        6|               4|                 1|                      1|                   1|
|         30|        6|               1|                 1|                      1|                   1|
|         30|        6|               7|                 1|                      2|                   1|
|         30|        6|               1|                 2|                      2|                   1|
|         30|        6|               4|               

In [18]:
from pyspark.sql import Window
from pyspark.sql.functions import percent_rank, when

subsetWindow = Window.partitionBy(dfSubset.Speed_limit).orderBy(dfSubset.Number_of_Casualties)
percentiles_df = dfSubset.withColumn("percentile", percent_rank().over(subsetWindow))
dfOutliers = percentiles_df.select(percentiles_df.Speed_limit,
                               when(percentiles_df.percentile <= 0.95,percentiles_df.Number_of_Casualties) \
                               .alias("Number_of_Casualties"))

dfOutliers = dfOutliers.na.drop()
dfOutliers.select("Number_of_Casualties").describe().show()

+-------+--------------------+
|summary|Number_of_Casualties|
+-------+--------------------+
|  count|             1600219|
|   mean|  1.2651274606788196|
| stddev|  0.5494648968159794|
|    min|                   1|
|    max|                   4|
+-------+--------------------+



In [19]:
dfSubset.show()

+-----------+---------+----------------+------------------+-----------------------+--------------------+
|Speed_limit|Road_type|Light_Conditions|Weather_Conditions|Road_Surface_Conditions|Number_of_Casualties|
+-----------+---------+----------------+------------------+-----------------------+--------------------+
|         30|        6|               1|                 2|                      2|                   1|
|         30|        3|               4|                 1|                      1|                   1|
|         30|        6|               4|                 1|                      1|                   1|
|         30|        6|               1|                 1|                      1|                   1|
|         30|        6|               7|                 1|                      2|                   1|
|         30|        6|               1|                 2|                      2|                   1|
|         30|        6|               4|               

In [20]:
from pyspark.sql.functions import log

dfTransformed = dfSubset.withColumn("log_Number_of_Casualties", log(col("Number_of_Casualties")))
dfTransformed = dfTransformed.drop("Number_of_Casualties")


In [21]:
dfSubset.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('before.csv')
dfTransformed.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('after.csv')

In [22]:
dfTransformed.select("log_Number_of_Casualties").describe().show()

+-------+------------------------+
|summary|log_Number_of_Casualties|
+-------+------------------------+
|  count|                 1640597|
|   mean|     0.20215798771110421|
| stddev|      0.3927618282073853|
|    min|                     0.0|
|    max|       4.532599493153256|
+-------+------------------------+



In [23]:
del dfTransformed, dfOutliers, subsetWindow, percentiles_df, dfAccidents, dfVehicles, dfCasualties, dfSorted

In [24]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["Speed_limit", "Road_type", "Light_Conditions", "Weather_Conditions", "Road_Surface_Conditions"],
    outputCol="features")

output = assembler.transform(dfSubset)
final_data = output.select("features", "Number_of_Casualties")


In [25]:
train_data, test_data = final_data.randomSplit([0.5,0.5])

In [26]:
train_data.describe().show()
test_data.describe().show()


+-------+--------------------+
|summary|Number_of_Casualties|
+-------+--------------------+
|  count|              822042|
|   mean|  1.3514005367122361|
| stddev|  0.8269504664760282|
|    min|                   1|
|    max|                  93|
+-------+--------------------+

+-------+--------------------+
|summary|Number_of_Casualties|
+-------+--------------------+
|  count|              818555|
|   mean|   1.350931824984271|
| stddev|  0.8252421213539887|
|    min|                   1|
|    max|                  87|
+-------+--------------------+



In [27]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

decisionTree = DecisionTreeClassifier(labelCol='Number_of_Casualties',featuresCol='features')
decisionTree = decisionTree.setMaxBins(10)
decisionTree = decisionTree.setMaxDepth(5)

In [28]:
model = decisionTree.fit(train_data)
predictions = model.transform(test_data)


In [36]:
predictions.show()

+--------------------+--------------------+--------------------+--------------------+----------+
|            features|Number_of_Casualties|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+--------------------+----------+
|[10.0,2.0,1.0,1.0...|                   1|[0.0,572.0,43.0,5...|[0.0,0.9181380417...|       1.0|
|[10.0,2.0,1.0,1.0...|                   1|[0.0,572.0,43.0,5...|[0.0,0.9181380417...|       1.0|
|[10.0,2.0,1.0,1.0...|                   1|[0.0,572.0,43.0,5...|[0.0,0.9181380417...|       1.0|
|[10.0,6.0,1.0,1.0...|                   1|[0.0,4351.0,425.0...|[0.0,0.8843495934...|       1.0|
|[10.0,6.0,1.0,1.0...|                   1|[0.0,4351.0,425.0...|[0.0,0.8843495934...|       1.0|
|[15.0,2.0,1.0,1.0...|                   1|[0.0,572.0,43.0,5...|[0.0,0.9181380417...|       1.0|
|[15.0,6.0,1.0,1.0...|                   1|[0.0,4351.0,425.0...|[0.0,0.8843495934...|       1.0|
|[15.0,6.0,1.0,1.0...|        

In [29]:
evaluator = MulticlassClassificationEvaluator(labelCol="Number_of_Casualties", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Decision tree accuracy: {0:2.2f}%".format(accuracy*100))


Decision tree accuracy: 76.66%


In [33]:
evaluator = MulticlassClassificationEvaluator(labelCol="Number_of_Casualties", predictionCol="prediction", metricName="f1")
f1Score = evaluator.evaluate(predictions)

print("Decision tree f1 score: ", f1Score)



Decision tree f1 score:  0.6653775326578529


In [34]:
evaluator = MulticlassClassificationEvaluator(labelCol="Number_of_Casualties", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)

print("Decision tree precision: ", precision)



Decision tree precision:  0.5877424993365491


In [35]:
evaluator = MulticlassClassificationEvaluator(labelCol="Number_of_Casualties", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator.evaluate(predictions)

print("Decision tree recall: ", recall)


Decision tree recall:  0.7666436586423637


In [39]:
model.featureImportances


SparseVector(5, {0: 0.8472, 1: 0.0759, 2: 0.0121, 3: 0.0118, 4: 0.0531})