# Final Project Stage I

### Import Dataset to MongoDB 

In [1]:
from pymongo import MongoClient
import json
import ast
import pandas as pd

In [3]:
# connect to server
client = MongoClient('mongo-csgy-6513-fall.db:27017',
                     username = "dj2145",
                     password = "dj2145", 
                     authSource = "db_dj2145")

mydb = client['db_dj2145']

collection_accidents = mydb.accidents

In [None]:
# dataset source url: https://www.kaggle.com/daveianhickey/2000-16-traffic-flow-england-scotland-wales
# read historical - event data: 
acc_data = pd.read_csv("accidents_2005_to_2007.csv")
payload = json.loads(acc_data.to_json(orient='records'))

acc_data_2 = pd.read_csv("accidents_2005_to_2007.csv")
payload_2 = json.loads(acc_data.to_json(orient='records'))

acc_data_3 = pd.read_csv("accidents_2005_to_2007.csv")
payload_3 = json.loads(acc_data.to_json(orient='records'))

In [None]:
collection_accidents.insert_many(payload)

collection_accidents.insert_many(payload_2)

collection_accidents.insert_many(payload_3)

### Spark MongoDB Connector

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .appName("mongodbspark") \
    .master('local')\
    .config("spark.mongodb.input.uri", "mongodb://dj2145:dj2145@mongo-csgy-6513-fall.db/db_dj2145") \
    .config("spark.mongodb.output.uri", "mongodb://dj2145:dj2145@mongo-csgy-6513-fall.db/db_dj2145") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/conda/envs/bigdata/lib/python3.7/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4b2e9046-a262-40f7-b11c-0e60a38b36bb;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 526ms :: artifacts dl 14ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts

In [5]:
# read from MongoDB
accidents = spark.read\
    .format('com.mongodb.spark.sql.DefaultSource')\
    .option( "uri", "mongodb://dj2145:dj2145@mongo-csgy-6513-fall.db:27017/db_dj2145.accidents") \
    .load()

                                                                                

### Data cleaning

Preprocess the raw data from database, transform it to cleaned data for futher analysis and prediction work. The primary factors to check contain null value, type error and filed missing, etc.

In [8]:
# filter and remove rows including null values or missing colums in core fileds
accidents = accidents.na.drop(subset=["Accident_Index", "Accident_Severity", "Date",
                                     "Latitude", "Light_Conditions", "Longitude", 
                                     "Number_of_Casualties", "Number_of_Vehicles",
                                     "Speed_limit", "Time", "Urban_or_Rural_Area",
                                     "Weather_Conditions", "Year"])

In [11]:
# filter and remove rows including null values or missing colums in fileds necessary for ml traning
accidents = accidents.na.drop(subset=["Pedestrian_Crossing-Physical_Facilities","Light_Conditions",
                            "Weather_Conditions","Road_Surface_Conditions","Special_Conditions_at_Site",
                            "Carriageway_Hazards","Urban_or_Rural_Area","Speed_limit",
                            "Did_Police_Officer_Attend_Scene_of_Accident","Day_of_Week",
                            "Road_Type","1st_Road_Class","2nd_Road_Class",'Pedestrian_Crossing-Human_Control'])

In [18]:
# write to MongoDB test
#df.write.format("mongo").mode("append").option("database",
#"db_dj2145").option("collection", "contacts").save()

                                                                                

In [12]:
# remove ObjectId (it will be regenerated while write back to MongoDB)
accidents = accidents.drop(accidents._id)
accidents = accidents.drop(accidents.Junction_Detail)

In [13]:
accidents.printSchema()

root
 |-- 1st_Road_Class: integer (nullable = true)
 |-- 1st_Road_Number: integer (nullable = true)
 |-- 2nd_Road_Class: integer (nullable = true)
 |-- 2nd_Road_Number: integer (nullable = true)
 |-- Accident_Index: string (nullable = true)
 |-- Accident_Severity: integer (nullable = true)
 |-- Carriageway_Hazards: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Day_of_Week: integer (nullable = true)
 |-- Did_Police_Officer_Attend_Scene_of_Accident: string (nullable = true)
 |-- Junction_Control: string (nullable = true)
 |-- LSOA_of_Accident_Location: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Light_Conditions: string (nullable = true)
 |-- Local_Authority_(District): integer (nullable = true)
 |-- Local_Authority_(Highway): string (nullable = true)
 |-- Location_Easting_OSGR: double (nullable = true)
 |-- Location_Northing_OSGR: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Number_of_Casualties: integer (nullable =

In [14]:
# type check for latitude, longitude, day of week, number of casualties, number of vehicles, urban or rural, year,
# accident severity, speed limit              
accidents = accidents.withColumn("Latitude", accidents["Latitude"].cast("Double"))\
                     .withColumn("Longitude", accidents["Longitude"].cast("Double"))\
                     .withColumn("Number_of_Casualties", accidents["Number_of_Casualties"].cast("Integer"))\
                     .withColumn("Number_of_Vehicles", accidents["Number_of_Vehicles"].cast("Integer"))\
                     .withColumn("Year", accidents["Year"].cast("Integer"))\
                     .withColumn("Urban_or_Rural_Area",accidents["Urban_or_Rural_Area"].cast("Integer"))\
                     .withColumn("Accident_Severity", accidents["Accident_Severity"].cast("Integer"))\
                     .withColumn("Speed_limit", accidents["Speed_limit"].cast("Integer"))

In [16]:
accidents = accidents.withColumn("Urban_or_Rural_Area",accidents["Urban_or_Rural_Area"].cast("Integer"))

In [17]:
accidents.select(accidents.Date,accidents.Accident_Severity).where(accidents.Accident_Severity<2).show()

[Stage 1:>                                                          (0 + 1) / 1]

+----------+-----------------+
|      Date|Accident_Severity|
+----------+-----------------+
|18/05/2005|                1|
|09/11/2005|                1|
|06/01/2005|                1|
|02/02/2005|                1|
|29/01/2005|                1|
|24/03/2005|                1|
|11/04/2005|                1|
|27/04/2005|                1|
|11/03/2005|                1|
|01/04/2005|                1|
|08/05/2005|                1|
|25/05/2005|                1|
|10/06/2005|                1|
|15/07/2005|                1|
|14/07/2005|                1|
|13/08/2005|                1|
|02/09/2005|                1|
|30/09/2005|                1|
|06/10/2005|                1|
|20/10/2005|                1|
+----------+-----------------+
only showing top 20 rows



                                                                                

Accident_Severity class changes for machine learning dataset

In [18]:
from pyspark.sql import functions as F

In [19]:
# make a copy of raw data
training_set = accidents

In [21]:
# transfer 3 classes to 2 classes
training_set = training_set.withColumn("Accident_Severity",
               F.when(training_set.Accident_Severity == 3, 0).otherwise(training_set.Accident_Severity))

In [22]:
training_set = training_set.withColumn("Accident_Severity",
               F.when(training_set.Accident_Severity > 0, 1).otherwise(training_set.Accident_Severity))

In [25]:
training_set.select(training_set.Accident_Severity).show()

+-----------------+
|Accident_Severity|
+-----------------+
|                1|
|                0|
|                0|
|                0|
|                0|
|                0|
|                0|
|                0|
|                0|
|                0|
|                0|
|                0|
|                0|
|                0|
|                0|
|                0|
|                1|
|                0|
|                0|
|                1|
+-----------------+
only showing top 20 rows



In [26]:
training_set.show()

+--------------+---------------+--------------+---------------+--------------+-----------------+-------------------+----------+-----------+-------------------------------------------+--------------------+-------------------------+---------+--------------------+--------------------------+-------------------------+---------------------+----------------------+---------+--------------------+------------------+---------------------------------+---------------------------------------+------------+-----------------------+------------------+--------------------------+-----------+-----+-------------------+--------------------+----+
|1st_Road_Class|1st_Road_Number|2nd_Road_Class|2nd_Road_Number|Accident_Index|Accident_Severity|Carriageway_Hazards|      Date|Day_of_Week|Did_Police_Officer_Attend_Scene_of_Accident|    Junction_Control|LSOA_of_Accident_Location| Latitude|    Light_Conditions|Local_Authority_(District)|Local_Authority_(Highway)|Location_Easting_OSGR|Location_Northing_OSGR|Longitude|

### Store the data in a new collection in MongoDB for next usage

In [34]:
# write to MongoDB 
accidents.write.format("mongo").mode("append").option("database",
"db_dj2145").option("collection", "filter_data").save()

                                                                                

In [27]:
training_set.write.format("mongo").mode("append").option("database",
"db_dj2145").option("collection", "ML_data").save()

                                                                                

In [35]:
train = spark.read\
        .format('com.mongodb.spark.sql.DefaultSource')\
        .option( "uri", "mongodb://dj2145:dj2145@mongo-csgy-6513-fall.db:27017/db_dj2145.ML_data")\
        .load()

                                                                                