In [1]:
# Section must be included at the beginning of each new notebook. Remember to change the app name. 
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/opt/spark')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

In [2]:
# Let's read in the data. If you open the dataset, you'll find that each column has a header. We specify that by stating that header=True.
# To make our lives easier, we can also use 'inferSchema' when importing CSVs. This automatically detects data types.
# If you would like to manually change data types, refer to this article: https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803
df = spark.read.csv('Datasets/aggreated_data.csv',header=True,inferSchema=True)

In [3]:
df = df.na.drop()

# df = df.na.drop(subset="road_surface_conditions")
# df = df.na.drop(subset="light_conditions")
# df = df.na.drop(subset="weather_conditions")
# df = df.na.drop(subset="accident_severity")
# df = df.na.drop(subset="day_of_week")

# df = df.na.drop(subset="special_conditions_at_site")
# df = df.na.drop(subset="pedestrian_movement")
df = df.na.drop(subset="age_of_vehicle")
df = df.na.drop(subset="sex_of_driver")
df = df.na.drop(subset="age_of_driver")
df = df.na.drop(subset="junction_location")
df = df.na.drop(subset="junction_detail")
df = df.na.drop(subset="junction_control")
df = df.na.drop(subset="day_of_week")
df = df.na.drop(subset="accident_severity")



print("Total data points2:", df.count())

Total data points2: 285331


In [4]:
features = [
          'age_of_vehicle','sex_of_driver','age_of_driver','junction_location','junction_detail','junction_control','day_of_week','accident_severity']
# features_s = ['light_conditions','weather_conditions']


df1 = df.select(*features)
df1 = df1.filter(df1.accident_severity > 1)
df1 = df1.filter(df1.age_of_vehicle > 0)
df1 = df1.filter(df1.junction_control > 0)




In [5]:
from numpy.random import randint
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
 
RATIO_ADJUST = 2.0 ## ratio of pos to neg in the df_subsample
 
counts = df1.select('accident_severity').groupBy('accident_severity').count().collect()


higherBound = counts[0][1]

TRESHOLD_TO_FILTER = int(RATIO_ADJUST * float(counts[1][1]) / counts[0][1] * higherBound)


randGen = lambda x: randint(0, higherBound) if x != '3' else -1
 
udfRandGen = udf(randGen, IntegerType())
df1 = df1.withColumn("randIndex", udfRandGen("accident_severity"))



df1 = df1.filter(df1['randIndex'] > TRESHOLD_TO_FILTER)

df1.count()
#print("Before down-sample data ammount", df1.count())
#print("After down-sample data ammount", df_subsample.count())
#print("Distribution of 3 and 2 cases of the down-sampled training data are: \n", df_subsample.groupBy("accident_severity").count().take(3))
#df1 = df_subsample.drop('randIndex')



87064

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
# categoricalColumns = ['road_surface_conditions','light_conditions','weather_conditions']

ageofvehicle_indexer = StringIndexer(inputCol='age_of_vehicle',outputCol='ageofvehicleIndexer').setHandleInvalid("skip")
sexofdriver_indexer = StringIndexer(inputCol='sex_of_driver',outputCol='sexofdriverIndexer').setHandleInvalid("skip")
ageofdriver_indexer = StringIndexer(inputCol='age_of_driver',outputCol='ageofdriverIndexer').setHandleInvalid("skip")
junctionlocation_indexer = StringIndexer(inputCol='junction_location',outputCol='junctionlocationIndexer').setHandleInvalid("skip")
junctiondetail_indexer = StringIndexer(inputCol='junction_detail',outputCol='junctiondetailIndexer').setHandleInvalid("skip")
junctioncontrol_indexer = StringIndexer(inputCol='junction_control',outputCol='junctioncontrolIndexer').setHandleInvalid("skip")
dayofweek_indexer = StringIndexer(inputCol='day_of_week',outputCol='dayofweekIndexer').setHandleInvalid("skip")


accidentSeverity_indexer = StringIndexer(inputCol='accident_severity',outputCol='label').setHandleInvalid("skip")


# road_surface_conditions_encoder = OneHotEncoder(inputCol='road_surface_conditions_indexer',outputCol='road_surface_conditionsVec')
ageofvehicle_encoder = OneHotEncoder(inputCol='ageofvehicleIndexer',outputCol='ageofvehicleVec')
sexofdriver_encoder = OneHotEncoder(inputCol='sexofdriverIndexer',outputCol='sexofdriverVec')
ageofdriver_encoder = OneHotEncoder(inputCol='ageofdriverIndexer',outputCol='ageofdriverVec')
junctionlocation_encoder = OneHotEncoder(inputCol='junctionlocationIndexer',outputCol='junctionlocationVec')
junctiondetail_encoder = OneHotEncoder(inputCol='junctiondetailIndexer',outputCol='junctiondetailVec')
junctioncontrol_encoder = OneHotEncoder(inputCol='junctioncontrolIndexer',outputCol='junctioncontrolVec')
dayofweek_encoder = OneHotEncoder(inputCol='dayofweekIndexer',outputCol='dayofweekVec')


# And finally, using vector assembler to turn all of these columns into one column (named features).
assembler = VectorAssembler(inputCols=['ageofvehicleVec','sexofdriverVec','ageofdriverVec','junctionlocationVec','junctiondetailVec','junctioncontrolVec','dayofweekVec'], outputCol="features")

In [7]:
from pyspark.ml import Pipeline

# Then go through our steps. It's essentially sequential to the above.
pipeline = Pipeline(stages=[ageofvehicle_indexer, sexofdriver_indexer, ageofdriver_indexer,junctionlocation_indexer,junctiondetail_indexer,junctioncontrol_indexer,dayofweek_indexer,
                            accidentSeverity_indexer,
                            ageofvehicle_encoder, sexofdriver_encoder,ageofdriver_encoder,junctionlocation_encoder,junctiondetail_encoder,junctioncontrol_encoder,dayofweek_encoder,
                            assembler])

# Now that we've got a number of steps, let's apply it to the DataFrame.
pipeline_model = pipeline.fit(df1)

# Incorporate results into a new DataFrame.
pipe_df = pipeline_model.transform(df1)

# Remove all variables other than features and label. 
pipe_df = pipe_df.select('label', 'features')

In [8]:
pipe_df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(170,[0,61,65,144...|
|  0.0|(170,[1,61,73,146...|
|  0.0|(170,[4,60,83,146...|
|  0.0|(170,[0,60,66,145...|
|  0.0|(170,[0,61,84,144...|
|  0.0|(170,[0,61,68,144...|
|  0.0|(170,[15,61,97,14...|
|  0.0|(170,[8,60,80,145...|
|  0.0|(170,[10,60,78,14...|
|  0.0|(170,[1,60,142,14...|
|  0.0|(170,[6,60,73,144...|
|  0.0|(170,[0,61,97,144...|
|  1.0|(170,[4,61,78,144...|
|  0.0|(170,[1,61,114,14...|
|  0.0|(170,[6,61,73,147...|
|  0.0|(170,[4,60,78,146...|
|  0.0|(170,[10,61,78,14...|
|  0.0|(170,[9,61,87,146...|
|  0.0|(170,[11,60,75,14...|
|  0.0|(170,[0,61,75,148...|
+-----+--------------------+
only showing top 20 rows



In [9]:
from pyspark.ml.classification import DecisionTreeClassifier

# Split our data. Note that the new DataFrame is being used.
train_data1, test_data1 = pipe_df.randomSplit([0.8,0.2])
#print("Training Dataset Count: " + str(train_data1.count()))
#print("Test Dataset Count: " + str(test_data1.count()))



In [10]:
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train_data1)

predictions = dtModel.transform(test_data1)

#setHandleInvalid("skip")

In [11]:
predictions.show()

+-----+--------------------+--------------+--------------------+----------+
|label|            features| rawPrediction|         probability|prediction|
+-----+--------------------+--------------+--------------------+----------+
|  0.0|(170,[0,60,63,144...|[1798.0,130.0]|[0.93257261410788...|       0.0|
|  0.0|(170,[0,60,63,144...|[1798.0,130.0]|[0.93257261410788...|       0.0|
|  0.0|(170,[0,60,63,144...|[1798.0,130.0]|[0.93257261410788...|       0.0|
|  0.0|(170,[0,60,63,144...|[1798.0,130.0]|[0.93257261410788...|       0.0|
|  0.0|(170,[0,60,63,144...|[1798.0,130.0]|[0.93257261410788...|       0.0|
|  0.0|(170,[0,60,63,144...|[1798.0,130.0]|[0.93257261410788...|       0.0|
|  0.0|(170,[0,60,63,144...|[1798.0,130.0]|[0.93257261410788...|       0.0|
|  0.0|(170,[0,60,63,144...|[1798.0,130.0]|[0.93257261410788...|       0.0|
|  0.0|(170,[0,60,63,144...|[1798.0,130.0]|[0.93257261410788...|       0.0|
|  0.0|(170,[0,60,63,144...|[1798.0,130.0]|[0.93257261410788...|       0.0|
|  0.0|(170,

In [12]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.5689420614444342
