In [106]:
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier,LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [6]:
conf=SparkConf().setAppName('Predicting_Accident_Prone_Areas')
sc=SparkContext(conf=conf)

In [7]:
ardd=sc.textFile('Accidents_2015.csv')

In [9]:
for i in ardd.take(10):
    print(i)

Accident_Index,Location_Easting_OSGR,Location_Northing_OSGR,Longitude,Latitude,Police_Force,Accident_Severity,Number_of_Vehicles,Number_of_Casualties,Date,Day_of_Week,Time,Local_Authority_(District),Local_Authority_(Highway),1st_Road_Class,1st_Road_Number,Road_Type,Speed_limit,Junction_Detail,Junction_Control,2nd_Road_Class,2nd_Road_Number,Pedestrian_Crossing-Human_Control,Pedestrian_Crossing-Physical_Facilities,Light_Conditions,Weather_Conditions,Road_Surface_Conditions,Special_Conditions_at_Site,Carriageway_Hazards,Urban_or_Rural_Area,Did_Police_Officer_Attend_Scene_of_Accident,LSOA_of_Accident_Location
201501BS70001,525130,180050,-0.198465,51.505538,1,3,1,1,12/01/2015,2,18:45,12,E09000020,5,0,6,30,3,4,6,0,0,0,4,1,1,0,0,1,1,E01002825
201501BS70002,526530,178560,-0.178838,51.491836,1,3,1,1,12/01/2015,2,07:50,12,E09000020,6,0,6,30,3,4,3,3218,0,0,1,1,1,0,0,1,1,E01002820
201501BS70004,524610,181080,-0.205590,51.514910,1,3,1,1,12/01/2015,2,18:08,12,E09000020,4,415,6,30,2,4,6,0,0,1,4,2,2,0

In [10]:
header=ardd.first()

In [11]:
arddfilter=ardd.filter(lambda line:line!=header)
arddfinal=arddfilter.filter(lambda row:row.split(','))

#### Some basic EDA

In [12]:
#Display Number of accidents based on severity
arddsev=arddfinal.map(lambda x:(x[6],1)).reduceByKey(lambda acc,val:acc+val)
arddsev1=arddsev.map(lambda x:(x[1],x[0]))
arddsev2=arddsev1.sortByKey(0)
for i in arddsev2.take(20):
    print(i)

(42890, '0')
(16290, '1')
(5774, 'E')
(5728, 'S')
(5408, '2')
(5299, '5')
(4813, 'K')
(4650, 'W')
(4594, 'C')
(3988, 'N')
(3542, 'P')
(3202, 'I')
(3085, 'A')
(2958, 'B')
(2686, 'G')
(2563, 'J')
(2358, 'T')
(2087, 'R')
(2086, 'Q')
(2002, 'L')


In [15]:
#Display Number of Accident and their severity during daytime
arddsevday=arddfinal.filter(lambda x:x[24]=="1")
arddsevday1=arddsevday.map(lambda x:(x[24],1)).reduceByKey(lambda acc,val:acc+val)
arddsevday2=arddsevday1.map(lambda x:(x[1],x[0]))
arddsevday3=arddsevday2.sortByKey(0)
for i in arddsevday3.take(20):
    print(i)

(13926, '1')


In [17]:
#Display Number of Accidents and their severity during Nighttime
arddnight=arddfinal.filter(lambda x:x[24]=="4")
arddnight1=arddnight.map(lambda x:(x[24],1)).reduceByKey(lambda acc,val:acc+val)
arddnight2=arddnight1.map(lambda x:(x[1],x[0]))
arddnight3=arddnight2.sortByKey(0)
for i in arddnight3.take(10):
    print(i)

(14220, '4')


#### Creating spark dataframe 

In [20]:
sparksession=SparkSession.builder.appName('Predicting_Accident_Prone_Areas').config('"spark.some.config.option","some-value"').getOrCreate()

In [23]:
df=sparksession.read.csv('Accidents_2015.csv',header=True,inferSchema=True)

In [24]:
print(df.show(20))

+--------------+---------------------+----------------------+---------+---------+------------+-----------------+------------------+--------------------+----------+-----------+-----+--------------------------+-------------------------+--------------+---------------+---------+-----------+---------------+----------------+--------------+---------------+---------------------------------+---------------------------------------+----------------+------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+-------------------------+
|Accident_Index|Location_Easting_OSGR|Location_Northing_OSGR|Longitude| Latitude|Police_Force|Accident_Severity|Number_of_Vehicles|Number_of_Casualties|      Date|Day_of_Week| Time|Local_Authority_(District)|Local_Authority_(Highway)|1st_Road_Class|1st_Road_Number|Road_Type|Speed_limit|Junction_Detail|Junction_Control|2nd_Road_Class|2nd_Road_Number|Pedestrian_Crossing-Human_

In [25]:
print(len(df.columns))

32


In [26]:
#remove rows containing NA
df=df.na.drop()

In [29]:
#set the hive context
hc=HiveContext(sc)
hc.setConf("hive.metastore.uris","thrift://METASTORE:9083")

In [30]:
#registering the dataframe
df.registerTempTable('tab')

In [32]:
res1=hc.sql("select Accident_Index,Time from tab where Time>='00:00' and Time<='05:59'")
res2=hc.sql("select Accident_Index,Time from tab where Time>='06:00' and Time<='11:59'")
res3=hc.sql("select Accident_Index,Time from tab where Time>='12:00' and Time<='17:59'")
res4=hc.sql("select Accident_Index,Time from tab where Time>='18:00' and Time<='23:59'")

In [41]:
#map time to rdd
time1=res1.map(lambda p:(p[0],1))
time2=res2.map(lambda p:(p[0],2))
time3=res3.map(lambda p:(p[0],3))
time4=res4.map(lambda p:(p[0],4))

In [56]:
#Union all the time to form a single rdd 
time=time1.union(time2).union(time3).union(time4)
#final timerdd to be included in the dataframe
finaltime=time.map(lambda p:(p[0],p[1]))

In [57]:
#Convert to dataframe and register to hive
finaltimedf=sparksession.createDataFrame(finaltime)

In [58]:
#give names to the columns
newnames=['Accident_Index','Time']
finaltimedf=finaltimedf.toDF(*newnames)
print(finaltimedf.show(20))

+--------------+----+
|Accident_Index|Time|
+--------------+----+
| 201501BS70022|   1|
| 201501BS70033|   1|
| 201501BS70039|   1|
| 201501BS70141|   1|
| 201501BS70175|   1|
| 201501BS70189|   1|
| 201501BS70232|   1|
| 201501BS70249|   1|
| 201501BS70260|   1|
| 201501BS70327|   1|
| 201501BS70336|   1|
| 201501BS70341|   1|
| 201501BS70351|   1|
| 201501BS70360|   1|
| 201501BS70364|   1|
| 201501BS70421|   1|
| 201501BS70439|   1|
| 201501BS70455|   1|
| 201501BS70473|   1|
| 201501BS70484|   1|
+--------------+----+
only showing top 20 rows

None


In [59]:
#store the table in hive
finaltimedf.registerTempTable('time')

In [60]:
#join the tab and time on Accident_Index
finaldf=hc.sql("select tt.*,ht.Time from tab tt,time ht where tt.Accident_Index==ht.Accident_Index")

In [62]:
print(finaldf.show(40))

+--------------+---------------------+----------------------+---------+---------+------------+-----------------+------------------+--------------------+----------+-----------+-----+--------------------------+-------------------------+--------------+---------------+---------+-----------+---------------+----------------+--------------+---------------+---------------------------------+---------------------------------------+----------------+------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+-------------------------+----+
|Accident_Index|Location_Easting_OSGR|Location_Northing_OSGR|Longitude| Latitude|Police_Force|Accident_Severity|Number_of_Vehicles|Number_of_Casualties|      Date|Day_of_Week| Time|Local_Authority_(District)|Local_Authority_(Highway)|1st_Road_Class|1st_Road_Number|Road_Type|Speed_limit|Junction_Detail|Junction_Control|2nd_Road_Class|2nd_Road_Number|Pedestrian_Crossing-H

In [63]:
print(finaldf.columns)

['Accident_Index', 'Location_Easting_OSGR', 'Location_Northing_OSGR', 'Longitude', 'Latitude', 'Police_Force', 'Accident_Severity', 'Number_of_Vehicles', 'Number_of_Casualties', 'Date', 'Day_of_Week', 'Time', 'Local_Authority_(District)', 'Local_Authority_(Highway)', '1st_Road_Class', '1st_Road_Number', 'Road_Type', 'Speed_limit', 'Junction_Detail', 'Junction_Control', '2nd_Road_Class', '2nd_Road_Number', 'Pedestrian_Crossing-Human_Control', 'Pedestrian_Crossing-Physical_Facilities', 'Light_Conditions', 'Weather_Conditions', 'Road_Surface_Conditions', 'Special_Conditions_at_Site', 'Carriageway_Hazards', 'Urban_or_Rural_Area', 'Did_Police_Officer_Attend_Scene_of_Accident', 'LSOA_of_Accident_Location', 'Time']


In [67]:
cols_in=['Police_Force','Number_of_Vehicles','Number_of_Casualties','Day_of_Week','first_Road_Class','Road_Type','Speed_limit','Junction_Detail','Junction_Control','second_Road_Class','Pedestrian_Crossing_Human_Control','Pedestrian_Crossing_Physical_Facilities','Light_Conditions','Weather_Conditions','Road_Surface_Conditions','Special_Conditions_at_Site','Carriageway_Hazards','Urban_or_Rural_Area','Time']

In [68]:
cols_out=['Polic_Force1','Number_of_Vehicles1','Number_of_Casualties1','Day_of_Week1']

In [85]:
#this code referred from https://weiminwang.blog/2016/06/09/pyspark-tutorial-building-a-random-forest-binary-classifier-on-unbalanced-dataset/
#converting some cols with discrete values to categorical
indexers = [StringIndexer(inputCol=x, outputCol=x+'_tmp')
            for x in cols_in ]
encoders = [OneHotEncoder(dropLast=False, inputCol=x+"_tmp", outputCol=y)
for x,y in zip(cols_in, cols_out)]
tmp = [[i,j] for i,j in zip(indexers, encoders)]
tmp = [i for sublist in tmp for i in sublist]

In [86]:
#pipeline the stages to be executed
vec=VectorAssembler(inputCols=cols_out,outputCol="features")

In [87]:
#Label Indexer for response variable
labels=StringIndexer(inputCol='Accident_Severity',outputCol="label")

In [88]:
tmp=tmp+[vec,labels]
pipeline=Pipeline(stages=tmp)

In [89]:
data=pipeline.fit(finaldf).transform(finaldf)

Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'


In [90]:
#store intermediate
data.cache()

DataFrame[Accident_Index: string, Location_Easting_OSGR: int, Location_Northing_OSGR: int, Longitude: double, Latitude: double, Police_Force: int, Accident_Severity: int, Number_of_Vehicles: int, Number_of_Casualties: int, Date: string, Day_of_Week: int, Time: string, Local_Authority_(District): int, Local_Authority_(Highway): string, 1st_Road_Class: int, 1st_Road_Number: int, Road_Type: int, Speed_limit: int, Junction_Detail: int, Junction_Control: int, 2nd_Road_Class: int, 2nd_Road_Number: int, Pedestrian_Crossing-Human_Control: int, Pedestrian_Crossing-Physical_Facilities: int, Light_Conditions: int, Weather_Conditions: int, Road_Surface_Conditions: int, Special_Conditions_at_Site: int, Carriageway_Hazards: int, Urban_or_Rural_Area: int, Did_Police_Officer_Attend_Scene_of_Accident: int, LSOA_of_Accident_Location: string, Time: bigint, Police_Force_tmp: double, Polic_Force1: vector, Number_of_Vehicles_tmp: double, Number_of_Vehicles1: vector, Number_of_Casualties_tmp: double, Number_

In [91]:
#train test split
train,test=data.randomSplit([0.7,0.3],seed=123)

In [93]:
print(data.columns)

['Accident_Index', 'Location_Easting_OSGR', 'Location_Northing_OSGR', 'Longitude', 'Latitude', 'Police_Force', 'Accident_Severity', 'Number_of_Vehicles', 'Number_of_Casualties', 'Date', 'Day_of_Week', 'Time', 'Local_Authority_(District)', 'Local_Authority_(Highway)', '1st_Road_Class', '1st_Road_Number', 'Road_Type', 'Speed_limit', 'Junction_Detail', 'Junction_Control', '2nd_Road_Class', '2nd_Road_Number', 'Pedestrian_Crossing-Human_Control', 'Pedestrian_Crossing-Physical_Facilities', 'Light_Conditions', 'Weather_Conditions', 'Road_Surface_Conditions', 'Special_Conditions_at_Site', 'Carriageway_Hazards', 'Urban_or_Rural_Area', 'Did_Police_Officer_Attend_Scene_of_Accident', 'LSOA_of_Accident_Location', 'Time', 'Police_Force_tmp', 'Polic_Force1', 'Number_of_Vehicles_tmp', 'Number_of_Vehicles1', 'Number_of_Casualties_tmp', 'Number_of_Casualties1', 'Day_of_Week_tmp', 'Day_of_Week1', 'features', 'label']


In [94]:
#Decision Tree
dt=DecisionTreeClassifier(labelCol="label",featuresCol="features")
model=dt.fit(train)

In [95]:
preds=model.transform(test)

In [96]:
preds.select('prediction','label','features').show(5)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|(87,[0,44,58,82],...|
|       0.0|  0.0|(87,[0,44,58,85],...|
|       0.0|  0.0|(87,[0,43,58,83],...|
|       0.0|  0.0|(87,[0,43,58,81],...|
|       0.0|  0.0|(87,[0,43,58,81],...|
+----------+-----+--------------------+
only showing top 5 rows



In [103]:
#Evaluating
evaluator=MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction',metricName='accuracy')
accuracy=evaluator.evaluate(preds)
print('Test Error='+str((1.0-accuracy)))
print('Accuracy Percentage='+str((accuracy*100)))

Test Error=0.15112040176409103
Accuracy Percentage=84.8879598235909


In [108]:
#Logistic Regression
lr=LogisticRegression(maxIter=20,regParam=0.3,elasticNetParam=0.8)
lrModel=lr.fit(train)

In [111]:
#Coefficients
print('Coefficients:'+str(lrModel.coefficientMatrix))
print('Intercept:'+str(lrModel.interceptVector))

Coefficients:3 X 87 CSRMatrix

Intercept:[2.035590233029595,0.2533337466725546,-2.28892397970215]


In [113]:
lpreds=lrModel.transform(test)

In [115]:
evaluator=MulticlassClassificationEvaluator(metricName='accuracy')
accuracy=evaluator.evaluate(lpreds)
print('Test Error='+str((1.0-accuracy)))
print('Accuracy Percentage='+str((accuracy*100)))

Test Error=0.15071251943814212
Accuracy Percentage=84.92874805618578
