<img width="200" style="float:left" 
     src="https://upload.wikimedia.org/wikipedia/commons/f/f3/Apache_Spark_logo.svg" />

# Sections
* [Objectives](#0)
* [1. Setup](#1)
  * [1.1 Start Hadoop](#1.1)  
  * [1.2 Create SparkSession](#1.2)
* [2. Creation of the dataframe](#2)
  * [2.1 Creation of the Dataframe for the accidents of London in 2019](#2.1)
  * [2.2 Shaping the DataFrame to obtain insights](#2.2)
* [3. Insights](#3)
  * [3.1 Insight I: London 2019 accidents heatmap](#3.1)
  * [3.2 Insight II: For each borough, the mode with the highest number of accidents](#3.2)
  * [3.3 Insight III: Hour of the day where the probability of fatal accident is the highest for each age category](#3.3)
  * [3.4 Insight IV: For each severity, the type of vehicle with the highest probability of accident](#3.4)
* [4. TearDown](#4)
  * [4.1 Stop Hadoop](#4.1)

<a id='0'></a>
## Objectives
<p>
<div>The main objectives of this project are:</div>
<ul>
    <li>Create a Dataframe from multiple JSON files in multiple directories</li>
    <li>Deal with complex data structures such as arrays in JSON documents.</li>
    <li>Provide insights of the created dataframes.</li>
</ul>    
</p>

<a id='1'></a>
## 1. Setup

<a id='1.1'></a>
### 1.1 Start Hadoop

Start Hadoop

Open a terminal and execute
```sh
hadoop-start.sh
```

<a id='1.2'></a>
### 1.2 Create SparkSession

Firstly, we are going to search for the Spark installation.

In [1]:
#Import libraries
import findspark
import pandas as pd
#Search Spark installation
findspark.init()

Next, we create the Spark Session with the name *Accidents for London 2019 - Data Analysis*.

We also check the spark version we are using to ensure it is correct.

In [2]:
from pyspark.sql.session import SparkSession

#Create Spark Session
spark = SparkSession.builder\
                    .appName("Accidents for London 2019 - Data Analysis")\
                    .getOrCreate()

#Print Spark version
print(f"This cluster relies on Spark '{spark.version}'")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


This cluster relies on Spark '3.2.1'


<a id='2'></a>
## 2. Creation of the Dataframes

<a id='2.1'></a>
### 2.1 Creation of the Dataframe for the accidents of London in 2019

At first, we read the JSON files we saved in HDFS using Nifi for London' accidents in 2019.

It is important to remark that this JSON files have been saved in multiple directories depending on the date of each accident. Taking this into consideration, we are going to read all the JSON files that are in the accidents directory for 2019.

In [3]:
from  pyspark.sql.functions import input_file_name

# DataFrame creation
AccidentsData2019London = spark.read.json("hdfs://localhost:9000/datalake/raw/tfl/accidents/2019/*/*/*")

# DataFrame updated with an additional column containing the filename contributing to the data in every row.
AccidentsData2019London = AccidentsData2019London.withColumn("filename_path", input_file_name())

# The inferred schema can be visualized using the printSchema() method - definitely semi-structured data.
AccidentsData2019London.printSchema()

                                                                                

root
 |-- $type: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- casualties: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- $type: string (nullable = true)
 |    |    |-- age: long (nullable = true)
 |    |    |-- ageBand: string (nullable = true)
 |    |    |-- class: string (nullable = true)
 |    |    |-- mode: string (nullable = true)
 |    |    |-- severity: string (nullable = true)
 |-- date: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lat: double (nullable = true)
 |-- location: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- severity: string (nullable = true)
 |-- vehicles: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- $type: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- filename_path: string (nullable = false)



<a id='2.3'></a>
### 2.2 Shaping the DataFrame to obtain insights

In the JSON files for the accidents in London 2019, we are able to observe that there are arrays of values for the casualties  and the vehicles involved in the accident.

This is an example of an array inside the vehicles column.

In [4]:
AccidentsData2019London.limit(1).select("vehicles").show(1,False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|vehicles                                                                                                                                                                                              |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{Tfl.Api.Presentation.Entities.AccidentStats.Vehicle, Tfl.Api.Presentation.Entities, Motorcycle_50_125cc}, {Tfl.Api.Presentation.Entities.AccidentStats.Vehicle, Tfl.Api.Presentation.Entities, Car}]|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

We need to shape the Dataframe to make sure this information is accesible in order to obtain insights from it.

Our solution is a main Dataframe containing the general information of the accidents with one row for each individual involved in it. Accompanying this Dataframe, we are going to have an extra Dataframe that it is going to contain the information of the vehicles involved in the accident with one row for each vehicule.

First, let's take a look to the main Dataframe with the information of the accident and the individuals involved.

In order to *flatten* an array value, you need to use the [PySpark function explode](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.sql.functions.explode.html); the *explode* and *withColumn* functions will:

- take every single element from the array (every JSON document in our case)
- create a new row per element (JSON document in our case) keeping the original values for all the columns (including the array one).
- add a new column with the corresponding element (JSON document in our case) from the array.

This is an example of exploding the casualties column for one accident with multiple individuals involved.

In [5]:
from  pyspark.sql.functions import explode, min, max, avg, col

number_of_elements = AccidentsData2019London.filter(col("id") == "717149").limit(1)\
                                           .withColumn("casualty", explode("casualties"))\
                                           .count()
print(f"Starting with 1 row and ending up with {number_of_elements} rows.")



Starting with 1 row and ending up with 2 rows.


                                                                                

Taking into account how this *explode* function works, we can use it to flatten the casualties array and create a new Dataframe.

In [6]:
#Renaming the severity column of the dataframe to not misunderstand it with the severity of causalities
AccidentsData2019London = AccidentsData2019London.withColumnRenamed("severity","accident_severity")

#Creation of the Dataframe for the causalities
AccidentsDataCasualties = AccidentsData2019London.withColumn("casualty", explode("casualties"))\
                                            .select("id", "borough", "date", "lat", "lon", "location", "accident_severity",
                                                    "casualty.age", "casualty.ageBand", "casualty.class", "casualty.mode", \
                                                    "casualty.severity")
#Printing the schema
AccidentsDataCasualties.printSchema()

root
 |-- id: long (nullable = true)
 |-- borough: string (nullable = true)
 |-- date: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- location: string (nullable = true)
 |-- accident_severity: string (nullable = true)
 |-- age: long (nullable = true)
 |-- ageBand: string (nullable = true)
 |-- class: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- severity: string (nullable = true)



Let's display the first 5 rows of the Dataframe **AccidentsDataCasualties**.

In [7]:
AccidentsDataCasualties.toPandas().head()

                                                                                

Unnamed: 0,id,borough,date,lat,lon,location,accident_severity,age,ageBand,class,mode,severity
0,739028,Harrow,2019-11-20T14:50:00Z,51.579375,-0.341793,On Bessborough Road Near The Junction With Low...,Slight,35.0,Adult,Driver,PoweredTwoWheeler,Slight
1,739029,Southwark,2019-11-21T13:28:00Z,51.486978,-0.106019,On De Laune Street Near The Junction With 0,Slight,31.0,Adult,Driver,Car,Slight
2,739030,Sutton,2019-11-21T13:33:00Z,51.364844,-0.192255,Location Uncertain On Throwley Way Near The Ju...,Slight,21.0,Adult,Driver,PoweredTwoWheeler,Slight
3,739031,Havering,2019-11-21T23:40:00Z,51.544569,0.197018,On Wood Lane Near The Junction With Easedale D...,Slight,29.0,Adult,Passenger,Car,Slight
4,739032,Hackney,2019-11-22T02:10:00Z,51.523703,-0.077236,On Holywell Lane Near The Junction With Bethna...,Slight,44.0,Adult,Pedestrian,Pedestrian,Slight


Using the same *explode* function, we are going to create the Dataframe for the vehicles involved in the accident.

In [8]:
#Creation of the Dataframe for the vehicles
AccidentsDataVehicles = AccidentsData2019London.withColumn("vehicle", explode("vehicles"))\
                                            .select("id", "vehicle.type")

#Printing the schema
AccidentsDataVehicles.printSchema()

root
 |-- id: long (nullable = true)
 |-- type: string (nullable = true)



Let's display the first 5 rows of the Dataframe **AccidentsDataVehicles**.

In [9]:
AccidentsDataVehicles.limit(5).toPandas()

Unnamed: 0,id,type
0,739028,Motorcycle_50_125cc
1,739028,Car
2,739029,Car
3,739029,Car
4,739030,Motorcycle_50_125cc


<a id='3'></a>
## 3. Insights

<a id='3.1'></a>
### 3.1 Insight I: London 2019 accidents heatmap

The Dataframe **AccidentsDataCasualties** contains the precise coordinates of where the accidents occurred. Using these coordinates, we created a interactive heatmap to obtain a general view on the location of the accidents. 

In order to run this code, it is necessary to have the *folium* and *geopy* libraries installed. If you have not installed them in your pyhton environment, execute this code (you may need to restart the kernel to use updated packages):

In [10]:
pip install folium

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [11]:
pip install geopy

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


Once both libraries are installed, you are able run the code to create the heatmap.

In [12]:
# import relevant packages / modules
import folium
from folium.plugins import HeatMap
import geopy

# extracting longitude and latitude values to separate lists
longs_column = AccidentsDataCasualties.toPandas().groupby("id")["lon"].unique().str[0]
longs=list(longs_column)
lats_column = AccidentsDataCasualties.toPandas().groupby("id")["lat"].unique().str[0]
lats=list(lats_column)
# calculating mean longitude and latitude values
import statistics
meanLong = statistics.mean(longs)
meanLat = statistics.mean(lats)
# create base map object using Map()
mapObj = folium.Map(location=[meanLat, meanLong], zoom_start = 12.5)
# create heatmap layer
heatmap = HeatMap(list(zip(lats, longs)),
                   min_opacity=0.2,
                   radius=50, blur=50, 
                   max_zoom=1)
# add heatmap layer to base map
heatmap.add_to(mapObj)
mapObj

                                                                                

<a id='3.2'></a>
### 3.2 Insight II: For each borough, the mode with the highest number of accidents

In this analysis, we are going to display a table for each borough with the means of transport that has the highest amount of accidents.

Each accident provides a column with the London borough where it happened. Thus, we are able groupby to obtain the number of accidents in each borough.

In [13]:
from pyspark.sql.functions import countDistinct

#Dataframe with the number of accidents per borough
AccidentsDataCasualtiesBorough = AccidentsDataCasualties.groupBy("borough")\
                                   .agg(countDistinct("id").alias("Nº accidents"))\
                                   .orderBy(col("Nº accidents").desc())

#Display it using Pandas
AccidentsDataCasualtiesBorough.toPandas().set_index("borough").head(10)

                                                                                

Unnamed: 0_level_0,Nº accidents
borough,Unnamed: 1_level_1
City of Westminster,3032
Lambeth,2382
Tower Hamlets,2232
Southwark,2186
Ealing,1966
Croydon,1908
Barnet,1896
Enfield,1894
Wandsworth,1890
Camden,1838


The Dataframe also contains a column with the means of transport of each individual involved in the accident (column *mode*). As an example, let's show all the means of transport for the borough *City of Westminster* with the corresponding number of accidents.

In [14]:
#Dataframe with the modes and number of accidents for the borough City of Westminster
AccidentsDataCasualtiesCoWMode = AccidentsDataCasualties.filter(col("borough") == "City of Westminster") \
                                   .groupBy("mode")\
                                   .agg(countDistinct("id").alias("Nº accidents"))\
                                   .orderBy(col("Nº accidents").desc())

#Display it using Pandas
AccidentsDataCasualtiesCoWMode.toPandas().set_index("mode")

                                                                                

Unnamed: 0_level_0,Nº accidents
mode,Unnamed: 1_level_1
Pedestrian,908
PedalCycle,756
PoweredTwoWheeler,688
Car,392
BusOrCoach,136
Taxi,128
PrivateHire,70
GoodsVehicle,42
OtherVehicle,28


Next, we combine the columns *borough* and *mode* to obtain a list with the mode with more accidents at each borough. 

In order to do so, we use SparkSQL to run a query over the Dataframe to obtain the desired information.

In [15]:
#Creation of the SQL view for the AccidentsDataCasualties Dataframe
AccidentsDataCasualties.createOrReplaceTempView("AccidentsDataCasualties_sql_view")

#SQL statement to obtain the list of borough with the mode that has the highest Nº of accidents
spark.sql("""select a.borough, a.mode, a.count
             from (select distinct borough, mode, count(id) as count
                   from AccidentsDataCasualties_sql_view
                   group by 1,2) a inner join (select x.borough, max(x.count) as max_count 
                                               from (select distinct borough,mode, count(id) as count
                                                     from AccidentsDataCasualties_sql_view
                                                     group by 1,2) x
                                               group by 1) b
                                    on a.count = b.max_count and a.borough = b. borough""")\
.toPandas().sort_values("count", ascending = False).set_index("borough")

                                                                                

Unnamed: 0_level_0,mode,count
borough,Unnamed: 1_level_1,Unnamed: 2_level_1
Enfield,Car,5198
Barnet,Car,4608
Newham,Car,4297
Redbridge,Car,4156
Ealing,Car,4111
Croydon,Car,3995
Hillingdon,Car,3945
City of Westminster,Pedestrian,3711
Bromley,Car,3641
Havering,Car,3622


<a id='3.3'></a>
### 3.3 Insight III: Hour of the day where the probability of fatal accident is the highest for each age category

In this analysis, we are going to obtain the hour of the day where the probability of a fatal accident is the highest in correspondance to certain age ranges. To do so, we are going to use multiple steps. 

First, we create a new column called *Time* with the time of each accident. To obtain the time, we use the timestamp provided for each accident.  

In [16]:
from pyspark.sql.functions import split,col

#Creation of the column Time
AccidentsDataCasualtiesTime = AccidentsDataCasualties.withColumn("Time", split(col("date"), "T").getItem(1))
#Display it using Pandas
AccidentsDataCasualtiesTime.toPandas().head()

                                                                                

Unnamed: 0,id,borough,date,lat,lon,location,accident_severity,age,ageBand,class,mode,severity,Time
0,739028,Harrow,2019-11-20T14:50:00Z,51.579375,-0.341793,On Bessborough Road Near The Junction With Low...,Slight,35.0,Adult,Driver,PoweredTwoWheeler,Slight,14:50:00Z
1,739029,Southwark,2019-11-21T13:28:00Z,51.486978,-0.106019,On De Laune Street Near The Junction With 0,Slight,31.0,Adult,Driver,Car,Slight,13:28:00Z
2,739030,Sutton,2019-11-21T13:33:00Z,51.364844,-0.192255,Location Uncertain On Throwley Way Near The Ju...,Slight,21.0,Adult,Driver,PoweredTwoWheeler,Slight,13:33:00Z
3,739031,Havering,2019-11-21T23:40:00Z,51.544569,0.197018,On Wood Lane Near The Junction With Easedale D...,Slight,29.0,Adult,Passenger,Car,Slight,23:40:00Z
4,739032,Hackney,2019-11-22T02:10:00Z,51.523703,-0.077236,On Holywell Lane Near The Junction With Bethna...,Slight,44.0,Adult,Pedestrian,Pedestrian,Slight,02:10:00Z


Afterwards, we use the previously generated *Time* column to create a new one called *Hour*, indicating the hours of each accident.

In [17]:
#Creation of the column Hour
AccidentsDataCasualtiesHour = AccidentsDataCasualtiesTime.withColumn("Hour", split(col("Time"), ":").getItem(0))
#Display using Pandas
AccidentsDataCasualtiesHour.toPandas().head()

                                                                                

Unnamed: 0,id,borough,date,lat,lon,location,accident_severity,age,ageBand,class,mode,severity,Time,Hour
0,739028,Harrow,2019-11-20T14:50:00Z,51.579375,-0.341793,On Bessborough Road Near The Junction With Low...,Slight,35.0,Adult,Driver,PoweredTwoWheeler,Slight,14:50:00Z,14
1,739029,Southwark,2019-11-21T13:28:00Z,51.486978,-0.106019,On De Laune Street Near The Junction With 0,Slight,31.0,Adult,Driver,Car,Slight,13:28:00Z,13
2,739030,Sutton,2019-11-21T13:33:00Z,51.364844,-0.192255,Location Uncertain On Throwley Way Near The Ju...,Slight,21.0,Adult,Driver,PoweredTwoWheeler,Slight,13:33:00Z,13
3,739031,Havering,2019-11-21T23:40:00Z,51.544569,0.197018,On Wood Lane Near The Junction With Easedale D...,Slight,29.0,Adult,Passenger,Car,Slight,23:40:00Z,23
4,739032,Hackney,2019-11-22T02:10:00Z,51.523703,-0.077236,On Holywell Lane Near The Junction With Bethna...,Slight,44.0,Adult,Pedestrian,Pedestrian,Slight,02:10:00Z,2


The Dataframe provides a column called *ageBand* that indicates if the person involved is an adult or a child. In our case, we will try to be more specific and add three new age categories: Teenagers, Young Adults and Elderly.  

We are going to drop the column given *ageBand* and create a new one called *age_category*.

In [18]:
#Drop ageBand column
AccidentsDataCasualtiesHour = AccidentsDataCasualtiesHour.drop(col("ageBand"))

In [19]:
from pyspark.sql.functions import when, lit

#Creation of the column age_category
AccidentsDataCasualtiesHourAgeCat = AccidentsDataCasualtiesHour.withColumn("age_category", \
                                                          when((AccidentsDataCasualtiesHour.age <= 12), lit("Children"))\
                                                         .when((AccidentsDataCasualtiesHour.age > 12) & \
                                                               (AccidentsDataCasualtiesHour.age < 18), lit("Teenagers"))\
                                                         .when((AccidentsDataCasualtiesHour.age >= 18) & \
                                                               (AccidentsDataCasualtiesHour.age < 26), lit("Young Adults"))\
                                                         .when((AccidentsDataCasualtiesHour.age >= 26) & \
                                                               (AccidentsDataCasualtiesHour.age < 65), lit("Adults"))\
                                                         .otherwise(lit("Elderly"))\
                                                          )
AccidentsDataCasualtiesHourAgeCat.toPandas().head()

                                                                                

Unnamed: 0,id,borough,date,lat,lon,location,accident_severity,age,class,mode,severity,Time,Hour,age_category
0,739028,Harrow,2019-11-20T14:50:00Z,51.579375,-0.341793,On Bessborough Road Near The Junction With Low...,Slight,35.0,Driver,PoweredTwoWheeler,Slight,14:50:00Z,14,Adults
1,739029,Southwark,2019-11-21T13:28:00Z,51.486978,-0.106019,On De Laune Street Near The Junction With 0,Slight,31.0,Driver,Car,Slight,13:28:00Z,13,Adults
2,739030,Sutton,2019-11-21T13:33:00Z,51.364844,-0.192255,Location Uncertain On Throwley Way Near The Ju...,Slight,21.0,Driver,PoweredTwoWheeler,Slight,13:33:00Z,13,Young Adults
3,739031,Havering,2019-11-21T23:40:00Z,51.544569,0.197018,On Wood Lane Near The Junction With Easedale D...,Slight,29.0,Passenger,Car,Slight,23:40:00Z,23,Adults
4,739032,Hackney,2019-11-22T02:10:00Z,51.523703,-0.077236,On Holywell Lane Near The Junction With Bethna...,Slight,44.0,Pedestrian,Pedestrian,Slight,02:10:00Z,2,Adults


With all the new columns, we create a pivot table grouping by *Hour* and using the *age_category* as columns for the accidents marked as fatal.

In [20]:
from pyspark.sql.functions import col

#Creation of the pivot table
PivotTableFatalAccidentsHourAgeCat = AccidentsDataCasualtiesHourAgeCat.filter(col("severity") == "Fatal")\
                                                    .groupBy("Hour")\
                                                    .pivot("age_category")\
                                                    .agg(countDistinct("id"))\
                                                    .orderBy(col("Hour"))

#Save that pivot table as a Pandas dataframe
PivotTableFatalAccidentsHourAgeCatPandas = PivotTableFatalAccidentsHourAgeCat.toPandas()
PivotTableFatalAccidentsHourAgeCatPandas.fillna(0)


                                                                                

Unnamed: 0,Hour,Adults,Children,Elderly,Teenagers,Young Adults
0,0,4.0,0.0,2.0,0.0,10.0
1,1,2.0,0.0,2.0,0.0,6.0
2,2,2.0,0.0,0.0,0.0,0.0
3,3,6.0,0.0,2.0,0.0,4.0
4,4,4.0,0.0,0.0,0.0,2.0
5,5,2.0,0.0,0.0,0.0,0.0
6,6,2.0,0.0,0.0,2.0,0.0
7,7,6.0,0.0,0.0,0.0,0.0
8,8,10.0,0.0,0.0,0.0,0.0
9,9,4.0,0.0,2.0,0.0,0.0


For this analysis, we use the probabilities of fatal accidents.

In [21]:
#We divided by the total number of accidents for each age category to have the probabilities of accident
#Children
PivotTableFatalAccidentsHourAgeCatPandas["Children"] = PivotTableFatalAccidentsHourAgeCatPandas["Children"] \
                                                    / PivotTableFatalAccidentsHourAgeCatPandas["Children"].sum()
#Teenagers
PivotTableFatalAccidentsHourAgeCatPandas["Teenagers"] = PivotTableFatalAccidentsHourAgeCatPandas["Teenagers"] \
                                                    / PivotTableFatalAccidentsHourAgeCatPandas["Teenagers"].sum()
#Young Adults
PivotTableFatalAccidentsHourAgeCatPandas["Young Adults"] = PivotTableFatalAccidentsHourAgeCatPandas["Young Adults"] \
                                                    / PivotTableFatalAccidentsHourAgeCatPandas["Young Adults"].sum()
#Adults
PivotTableFatalAccidentsHourAgeCatPandas["Adults"] = PivotTableFatalAccidentsHourAgeCatPandas["Adults"] \
                                                    / PivotTableFatalAccidentsHourAgeCatPandas["Adults"].sum()
#Elderly
PivotTableFatalAccidentsHourAgeCatPandas["Elderly"] = PivotTableFatalAccidentsHourAgeCatPandas["Elderly"] \
                                                    / PivotTableFatalAccidentsHourAgeCatPandas["Elderly"].sum()
#Display the pivot table
PivotTableFatalAccidentsHourAgeCatPandas\
                                        .loc[:,["Hour", "Children", "Teenagers", "Young Adults", "Adults", "Elderly"]]\
                                        .fillna(0).set_index("Hour")

Unnamed: 0_level_0,Children,Teenagers,Young Adults,Adults,Elderly
Hour,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
0,0.0,0.0,0.238095,0.033333,0.030303
1,0.0,0.0,0.142857,0.016667,0.030303
2,0.0,0.0,0.0,0.016667,0.0
3,0.0,0.0,0.095238,0.05,0.030303
4,0.0,0.0,0.047619,0.033333,0.0
5,0.0,0.0,0.0,0.016667,0.0
6,0.0,0.333333,0.0,0.016667,0.0
7,0.0,0.0,0.0,0.05,0.0
8,0.0,0.0,0.0,0.083333,0.0
9,0.0,0.0,0.0,0.033333,0.030303


Lastly, we extract the hour with the highest probability of an accident for each age range.

In [22]:
#Extracting the hour with the highest probability of fatal accident for each age category.
print("Children")
print(PivotTableFatalAccidentsHourAgeCatPandas["Children"].fillna(0).nlargest(1, keep = "all"))
print(" ")
print("Teenagers")
print(PivotTableFatalAccidentsHourAgeCatPandas["Teenagers"].fillna(0).nlargest(1, keep = "all"))
print(" ")
print("Young Adults")
print(PivotTableFatalAccidentsHourAgeCatPandas["Young Adults"].fillna(0).nlargest(1, keep = "all"))
print(" ")
print("Adults")
print(PivotTableFatalAccidentsHourAgeCatPandas["Adults"].fillna(0).nlargest(1, keep = "all"))
print(" ")
print("Elderly")
print(PivotTableFatalAccidentsHourAgeCatPandas["Elderly"].fillna(0).nlargest(1, keep = "all"))

Children
16    0.25
18    0.25
19    0.25
20    0.25
Name: Children, dtype: float64
 
Teenagers
19    0.666667
Name: Teenagers, dtype: float64
 
Young Adults
0    0.238095
Name: Young Adults, dtype: float64
 
Adults
8     0.083333
18    0.083333
Name: Adults, dtype: float64
 
Elderly
17    0.151515
Name: Elderly, dtype: float64


<a id='3.4'></a>
### 3.4 Insight IV: For each severity, the type of vehicle with the highest probability of accident

In this analysis, we are going to obtain which type of vehicle is tied to the highest probability of an accident for each of the three categories of severity: fatal, serious, slight.

First, we join the vehicles table with the table that has the general information of the accident and the people involved in it. From this join table, we only need the accident id, severity and vehicle type.

To do so, we are going to use sparkSQL again.

In [23]:
#SQL view for the vehicles dataframe
AccidentsDataVehicles.createOrReplaceTempView("AccidentsDataVehicles_sql_view")

#SQL statement to join the tables.
AccidentsDatavehiclesSevType = spark.sql("""SELECT DISTINCT a.id, a.accident_severity, b.type
             FROM AccidentsDataCasualties_sql_view a INNER JOIN AccidentsDatavehicles_sql_view b ON (a.id = b.id)
             """)

#Showing the result table
AccidentsDatavehiclesSevType.toPandas().head()

                                                                                

Unnamed: 0,id,accident_severity,type
0,741114,Slight,Motorcycle_50_125cc
1,718668,Fatal,Car
2,738937,Slight,Car
3,361940,Serious,PedalCycle
4,729923,Slight,Car


Once we have this new table, we are going to create a pivot table grouping by vehicle type and using as columns the different severities of the accidents.

In [24]:
AccidentsDataVehiclesSevTypeGroupByVehType = AccidentsDatavehiclesSevType.groupBy("type")\
                                     .pivot("accident_severity")\
                                     .agg(countDistinct("id")).toPandas()

AccidentsDataVehiclesSevTypeGroupByVehType.fillna(0)

                                                                                

Unnamed: 0,type,Fatal,Serious,Slight
0,HeavyGoodsVehicle,18.0,116,508
1,OtherMotorVehicle,8.0,184,956
2,RiddenHorse,0.0,2,8
3,Car,154.0,4912,33672
4,Taxi,4.0,512,3618
5,Motorcycle_500cc_Plus,38.0,546,1392
6,BusOrCoach,28.0,438,2560
7,PedalCycle,14.0,1692,7910
8,Motorcycle_0_50cc,0.0,196,834
9,LightGoodsVehicle,18.0,778,4546


Having obtained this new table, we transform it into probabilities dividing by the total sum.

In [25]:
#Calculating the probilities that each vehicle type has for the three severity types
cols=['Slight','Fatal','Serious']
AccidentsDataVehiclesSevTypeGroupByVehType['total'] = AccidentsDataVehiclesSevTypeGroupByVehType[cols].sum(axis=1)

AccidentsDataVehiclesSevTypeGroupByVehType["Fatal%"] = AccidentsDataVehiclesSevTypeGroupByVehType['Fatal']\
                                                        /AccidentsDataVehiclesSevTypeGroupByVehType["total"]

AccidentsDataVehiclesSevTypeGroupByVehType["Serious%"] = AccidentsDataVehiclesSevTypeGroupByVehType['Serious']\
                                                          /AccidentsDataVehiclesSevTypeGroupByVehType["total"]

AccidentsDataVehiclesSevTypeGroupByVehType["Slight%"] = AccidentsDataVehiclesSevTypeGroupByVehType['Slight']\
                                                         /AccidentsDataVehiclesSevTypeGroupByVehType["total"]

AccidentsDataVehiclesSevTypeGroupByVehType = AccidentsDataVehiclesSevTypeGroupByVehType\
                                              .drop(['Fatal','Serious','Slight','total'],axis=1)

#Display using Pandas
AccidentsDataVehiclesSevTypeGroupByVehType.fillna(0).sort_values("Fatal%", ascending = False)

Unnamed: 0,type,Fatal%,Serious%,Slight%
0,HeavyGoodsVehicle,0.028037,0.180685,0.791277
5,Motorcycle_500cc_Plus,0.019231,0.276316,0.704453
14,MediumGoodsVehicle,0.013699,0.164384,0.821918
6,BusOrCoach,0.009253,0.144746,0.846001
1,OtherMotorVehicle,0.006969,0.160279,0.832753
11,Motorcycle_125_500cc,0.004969,0.228571,0.76646
3,Car,0.003975,0.126801,0.869224
9,LightGoodsVehicle,0.00337,0.145638,0.850992
10,Motorcycle_50_125cc,0.002786,0.162674,0.83454
7,PedalCycle,0.001456,0.175957,0.822587


Lastly, we extract the vehicle with the highest probability of an accident for each of the three severities.

In [26]:
print(AccidentsDataVehiclesSevTypeGroupByVehType.set_index("type")["Fatal%"].fillna(0).nlargest(1))
print(" ")
print(AccidentsDataVehiclesSevTypeGroupByVehType.set_index("type")["Serious%"].fillna(0).nlargest(1))
print(" ")
print(AccidentsDataVehiclesSevTypeGroupByVehType.set_index("type")["Slight%"].fillna(0).nlargest(1))

type
HeavyGoodsVehicle    0.028037
Name: Fatal%, dtype: float64
 
type
Motorcycle_500cc_Plus    0.276316
Name: Serious%, dtype: float64
 
type
Taxi    0.875181
Name: Slight%, dtype: float64


<a id='4'></a>
## 4. Tear Down

Don't forget to stop the services used for this notebook.

<a id='4.1'></a>
### 4.1 Stop Hadoop

Open a terminal and execute the following command:
```sh
hadoop-stop.sh
```