# pyspark

## Reading in the data from SQL

In [1]:
import pandas as pd
import pyspark

In [2]:
from   io           import StringIO
import os
import sys
import warnings

import matplotlib.pyplot as plt

import requests
import seaborn as sns

warnings.filterwarnings('ignore')

%pylab inline
%config InlineBackend.figure_formats = ['retina']

Populating the interactive namespace from numpy and matplotlib


In [3]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

21/12/12 10:08:59 WARN Utils: Your hostname, Amirs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.63 instead (on interface en0)
21/12/12 10:08:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/12/12 10:09:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
fullpath = 'us_accide_2019.csv'
    
# first line of file
! head -n 1 {fullpath}

ID,Severity,Start_Time,End_Time,Start_Lat,Start_Lng,End_Lat,End_Lng,Distance(mi),Description,Number,Street,Side,City,County,State,Zipcode,Country,Timezone,Airport_Code,Weather_Timestamp,Temperature(F),Wind_Chill(F),Humidity(%),Pressure(in),Visibility(mi),Wind_Direction,Wind_Speed(mph),Precipitation(in),Weather_Condition,Amenity,Bump,Crossing,Give_Way,Junction,No_Exit,Railway,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight


In [5]:
# first 5 lines of file 
! head -n 5 {fullpath}

ID,Severity,Start_Time,End_Time,Start_Lat,Start_Lng,End_Lat,End_Lng,Distance(mi),Description,Number,Street,Side,City,County,State,Zipcode,Country,Timezone,Airport_Code,Weather_Timestamp,Temperature(F),Wind_Chill(F),Humidity(%),Pressure(in),Visibility(mi),Wind_Direction,Wind_Speed(mph),Precipitation(in),Weather_Condition,Amenity,Bump,Crossing,Give_Way,Junction,No_Exit,Railway,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight
A-2982119,4,2019-10-19 04:30:00.000000000,2020-11-01 00:00:00.000000000,38.87554,-77.280396,38.87909,-77.253206,1.483,On Rt. 6731 (Vaden Dr) in the County of Fairfax at Country Creek Rd; Virginia Center Bv; Rt. 6154E/W (Fairfax County); Rt. 6154E (Fairfax County) motorists can expect potential delays in this area from 10/18/19 at 9:30 PM until 10/31/20 at 5:00 PM due to construction activities. All east lanes are closed. All west lanes are closed.,,I-66 E,R,Fairfax,Fairfax Count

In [6]:
columns = ['ID','Severity','Start_Time','End_Time','Start_Lat','Start_Lng','End_Lat','End_Lng','Distance(mi)',
               'Description','Number','Street','Side','City','County','State','Zipcode','Country','Timezone','Airport_Code',
               'Weather_Timestamp','Temperature(F)','Wind_Chill(F)','Humidity(%)','Pressure(in)','Visibility(mi)',
               'Wind_Direction','Wind_Speed(mph)','Precipitation(in)','Weather_Condition','Amenity','Bump','Crossing',
               'Give_Way','Junction','No_Exit','Railway','Roundabout','Station','Stop','Traffic_Calming','Traffic_Signal',
               'Turning_Loop','Sunrise_Sunset','Civil_Twilight','Nautical_Twilight','Astronomical_Twilight']

In [7]:
# read.csv is very similar to the Pandas version
data = spark.read.csv(fullpath,
                     sep=',',
                     inferSchema=True,
                     header=True)

                                                                                

In [8]:
data.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: string (nullable = true)
 |-- End_Time: string (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Number: double (nullable = true)
 |-- Street: string (nullable = true)
 |-- Side: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: string (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): doub

In [9]:
data.head()

21/12/12 10:09:54 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Row(ID='A-2982119', Severity=4, Start_Time='2019-10-19 04:30:00.000000000', End_Time='2020-11-01 00:00:00.000000000', Start_Lat=38.87554, Start_Lng=-77.280396, End_Lat=38.87909, End_Lng=-77.253206, Distance(mi)=1.483, Description='On Rt. 6731 (Vaden Dr) in the County of Fairfax at Country Creek Rd; Virginia Center Bv; Rt. 6154E/W (Fairfax County); Rt. 6154E (Fairfax County) motorists can expect potential delays in this area from 10/18/19 at 9:30 PM until 10/31/20 at 5:00 PM due to construction activities. All east lanes are closed. All west lanes are closed.', Number=None, Street='I-66 E', Side='R', City='Fairfax', County='Fairfax County', State='VA', Zipcode='22031', Country='US', Timezone='US/Eastern', Airport_Code='KDAA', Weather_Timestamp='2019-10-19 04:56:00', Temperature(F)=37.0, Wind_Chill(F)=37.0, Humidity(%)=92.0, Pressure(in)=30.02, Visibility(mi)=10.0, Wind_Direction='CALM', Wind_Speed(mph)=0.0, Precipitation(in)=0.0, Weather_Condition='Fair', Amenity=False, Bump=False, Cros

In [10]:
# function to select a few rows of data, convert to a Pandas dataframe, and transpose
def preview(df, n=3):
    return pd.DataFrame(df.take(n), columns=df.columns).T

preview(data)

Unnamed: 0,0,1,2
ID,A-2982119,A-3226009,A-3318645
Severity,4,4,4
Start_Time,2019-10-19 04:30:00.000000000,2019-12-17 09:12:52,2019-10-19 04:30:00
End_Time,2020-11-01 00:00:00.000000000,2020-10-14 07:07:49,2020-11-01 00:00:00
Start_Lat,38.87554,33.634089,38.87554
Start_Lng,-77.280396,-112.106196,-77.280396
End_Lat,38.87909,33.636297,38.87909
End_Lng,-77.253206,-112.104149,-77.253206
Distance(mi),1.483,0.193,1.483
Description,On Rt. 6731 (Vaden Dr) in the County of Fairfa...,Closed road from W Grandview Rd / N Garrin Dr ...,On Rt. 6731 (Vaden Dr) in the County of Fairfa...


## Data Exploration

In [11]:
print((data.count(), len(data.columns)))

(261772, 47)


In [12]:
data.summary().show()

[Stage 7:>                                                          (0 + 1) / 1]

+-------+---------+------------------+-------------------+--------------------+------------------+------------------+------------------+-------------------+-------------------+--------------------+------------------+-----------------+------+----------+------+------+------------------+-------+----------+------------+-------------------+------------------+------------------+-----------------+------------------+-----------------+--------------+------------------+--------------------+------------------+--------------+--------------+-----------------+---------------------+
|summary|       ID|          Severity|         Start_Time|            End_Time|         Start_Lat|         Start_Lng|           End_Lat|            End_Lng|       Distance(mi)|         Description|            Number|           Street|  Side|      City|County| State|           Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|    Temperature(F)|     Wind_Chill(F)|      Humidity(%)|      Pressure(in)|   Visibility

                                                                                

## Data Exploration ( Nulls and Counts)

1) Finding the count of Nulls in columns in Spark

2) Counts of strings for dummies

In [13]:
from pyspark.sql.functions import isnan, when, count, col

In [14]:
import pyspark.sql.functions as F
def count_missings(spark_df,sort=True):
    """
    Counts number of nulls and nans in each column
    """
    df = spark_df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type) in spark_df.dtypes if c_type not in ('timestamp', 'string', 'date','boolean', 'binary')]).toPandas()

    if len(df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return df

In [15]:
count_missings(data)

                                                                                

Unnamed: 0,count
Number,194942
Precipitation(in),54979
Wind_Chill(F),33485
Wind_Speed(mph),17302
Humidity(%),7535
Temperature(F),7255
Visibility(mi),7190
Pressure(in),5800
Severity,0
Start_Lat,0


In [16]:
#To perform SQL queries, the tables have to be registered using the registerTempTable method.
data.registerTempTable('data')

In [18]:
avg_sev = spark.sql(r"""SELECT AVG(Severity) AS avrage_severity 
                            FROM data
                            """)
avg_sev.show()

+-----------------+
|  avrage_severity|
+-----------------+
|2.303665021469065|
+-----------------+



In [56]:
severity_counts = spark.sql(r"""SELECT Severity, COUNT(Severity) AS total 
                                     FROM data 
                                     GROUP BY Severity 
                                     ORDER BY total desc 
                                     """)
severity_counts.show()



+--------+------+
|Severity| total|
+--------+------+
|       2|206659|
|       3| 30735|
|       4| 24378|
+--------+------+



                                                                                

In [57]:
# Timezone 
timezone_counts = spark.sql(r"""SELECT Timezone, COUNT(Timezone) AS total 
                                     FROM data 
                                     GROUP BY Timezone 
                                     ORDER BY total desc 
                                     """)
timezone_counts.show()




+-----------+------+
|   Timezone| total|
+-----------+------+
| US/Pacific|147135|
| US/Eastern| 53504|
| US/Central| 34023|
|US/Mountain| 26341|
|       null|     0|
+-----------+------+



                                                                                

In [19]:
state_counts = spark.sql(r"""SELECT State, COUNT(state) AS total 
                                     FROM data 
                                     GROUP BY state 
                                     ORDER BY total desc 
                                     """)
state_counts.show()



+-----+------+
|State| total|
+-----+------+
|   CA|108309|
|   OR| 35121|
|   UT| 15914|
|   MN| 11329|
|   FL|  9972|
|   NY|  6925|
|   TX|  6737|
|   IL|  6447|
|   CO|  6088|
|   GA|  5386|
|   MI|  5032|
|   WA|  4455|
|   VA|  4246|
|   NJ|  3744|
|   MD|  3654|
|   AZ|  2715|
|   NC|  2644|
|   IN|  1803|
|   MO|  1691|
|   TN|  1671|
+-----+------+
only showing top 20 rows





In [20]:
#NOTE: Since we have the column with Timezone, no need to catagorize the states for dummies!

In [21]:
# what side of the road are accidents happening?

Side_counts = spark.sql(r"""SELECT Side, COUNT(Side) AS total 
                                     FROM data 
                                     GROUP BY Side 
                                     ORDER BY total desc 
                                     """)
Side_counts.show()

+----+------+
|Side| total|
+----+------+
|   R|228209|
|   L| 33563|
+----+------+



In [22]:
timezone_counts = spark.sql(r"""SELECT Timezone, COUNT(Timezone) AS total 
                                     FROM data 
                                     GROUP BY Timezone 
                                     ORDER BY total desc 
                                     """)
timezone_counts.show()

+-----------+------+
|   Timezone| total|
+-----------+------+
| US/Pacific|147135|
| US/Eastern| 53504|
| US/Central| 34023|
|US/Mountain| 26341|
|       null|     0|
+-----------+------+



In [23]:
airportcode_counts = spark.sql(r"""SELECT Airport_Code, COUNT(Airport_Code) AS total 
                                     FROM data 
                                     GROUP BY Airport_Code 
                                     ORDER BY total desc 
                                     """)
airportcode_counts.show()

[Stage 18:>                                                         (0 + 4) / 4]

+------------+-----+
|Airport_Code|total|
+------------+-----+
|        KCQT| 7088|
|        KU42| 4541|
|        KFUL| 4236|
|        KEMT| 4073|
|        KSNA| 3881|
|        KPDX| 3847|
|        KHHR| 3488|
|        KMCC| 3044|
|        KSBD| 3006|
|        KOAK| 2939|
|        KCCR| 2889|
|        KSAC| 2871|
|        KPOC| 2860|
|        KEUG| 2847|
|        KSLC| 2841|
|        KRAL| 2679|
|        KLGB| 2602|
|        KSJC| 2577|
|        KONT| 2452|
|        KMDW| 2122|
+------------+-----+
only showing top 20 rows





In [24]:
wind_dir_counts = spark.sql(r"""SELECT Wind_Direction, COUNT(Wind_Direction) AS total 
                                     FROM data 
                                     GROUP BY Wind_Direction 
                                     ORDER BY total desc 
                                     """)
wind_dir_counts.show(40)



+--------------+-----+
|Wind_Direction|total|
+--------------+-----+
|          CALM|42293|
|             W|18306|
|           WNW|14583|
|             S|14104|
|            NW|13957|
|           WSW|12445|
|           NNW|11774|
|           VAR|11382|
|           SSW|11340|
|           SSE|10947|
|            SW|10776|
|             N|10163|
|            SE| 9316|
|             E| 9271|
|          Calm| 8601|
|           ESE| 8214|
|           ENE| 7407|
|           NNE| 6662|
|            NE| 6654|
|         North| 4003|
|         South| 3723|
|          West| 3438|
|          East| 2968|
|      Variable| 1523|
|          null|    0|
+--------------+-----+



                                                                                

In [27]:
city_accident_counts = spark.sql(r"""SELECT City, COUNT(City) AS total 
                                     FROM data 
                                     GROUP BY City 
                                     ORDER BY total desc 
                                     """)
city_accident_counts.show(10)

[Stage 26:>                                                         (0 + 4) / 4]

+--------------+-----+
|          City|total|
+--------------+-----+
|   Los Angeles| 9380|
|      Portland| 5317|
|    Sacramento| 4916|
|Salt Lake City| 3719|
|     San Diego| 3516|
|   Minneapolis| 2610|
|      San Jose| 2485|
|       Atlanta| 2198|
|    Saint Paul| 2159|
|        Denver| 2137|
+--------------+-----+
only showing top 10 rows





In [55]:
# What are the weather conditions at the time of accicent occurence
weather_conditions = spark.sql(r"""SELECT Weather_Condition, COUNT(Weather_Condition) AS total 
                                     FROM data 
                                     GROUP BY Weather_Condition 
                                     ORDER BY total desc 
                                     """)

weather_conditions.show()



+--------------------+-----+
|   Weather_Condition|total|
+--------------------+-----+
|                Fair|99281|
|              Cloudy|31830|
|       Mostly Cloudy|30455|
|       Partly Cloudy|27659|
|               Clear|13402|
|            Overcast|12153|
|          Light Rain|11997|
|          Light Snow| 5711|
|                Haze| 3273|
|                 Fog| 3263|
|    Scattered Clouds| 3125|
|                Rain| 2779|
|        Fair / Windy| 1307|
|          Heavy Rain|  983|
|                Snow|  838|
|      Cloudy / Windy|  586|
|       Light Drizzle|  535|
|Mostly Cloudy / W...|  473|
|Partly Cloudy / W...|  402|
|Thunder in the Vi...|  368|
+--------------------+-----+
only showing top 20 rows



                                                                                

### Preprocessing 

In [77]:
# making a copy of the pyspark dataframe - need to take it to pandas and then copy and bring back
schema = data.schema
X_pd = data.toPandas()
data_cp = spark.createDataFrame(X_pd,schema=schema)
del X_pd

                                                                                

In [78]:
# Checking for the correct copy
#preview(data_cp)

In [79]:
print(data.columns)

['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street', 'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone', 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight']


In [80]:
'''#Nulls in all the data
Dict_Null_data = {col:data.filter(isnan(data[col[0]])).count() for col in data.dtypes if col[1] != 'boolean'}
Dict_Null_data'''

"#Nulls in all the data\nDict_Null_data = {col:data.filter(isnan(data[col[0]])).count() for col in data.dtypes if col[1] != 'boolean'}\nDict_Null_data"

### deleting unncessary columns...

In [63]:
from pyspark.sql.functions import *

In [81]:
cols_to_drop = ['ID','Start_Time','End_Time','Description','Street',
                'Weather_Timestamp','Zipcode','County','City','Airport_Code','Precipitation(in)'] 
# Weather_Condition to be processed again after cleaning

In [82]:
data_cp = data_cp.drop(*cols_to_drop)

In [83]:
print(data_cp.columns)

['Severity', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Number', 'Side', 'State', 'Country', 'Timezone', 'Temperature(F)', 'Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight']


In [88]:
#Dict_Null = {col:data_cp.filter(isnan(data_cp[col[0]])).count() for col in data_cp.dtypes if col[1] != 'boolean'}
#Dict_Null

In [85]:
# Replace nan with Nulls
columns = data_cp.dtypes
for cols, typ in columns:
    if typ != 'boolean':
        data_cp = data_cp.withColumn(cols,when(isnan(col(cols)),None).otherwise(col(cols)))

In [87]:
#Dict_Null = {col:data_cp.filter(isnan(data_cp[col[0]])).count() for col in data_cp.dtypes if col[1] != 'boolean'}
#Dict_Null

In [89]:
label = 'Severity'
string_cols =  [cols[0] for cols in data_cp.dtypes if cols[1] == "string" ]
num_cols = [cols[0] for cols in data_cp.dtypes if cols[1] == "int" or cols[1] == "double" ]
num_cols.remove(label)
bool_cols = [cols[0] for cols in data_cp.dtypes if cols[1] == "boolean"]

# string_cols,
# num_cols
# bool_cols

In [90]:
data_cp = data_cp.fillna("unknown",string_cols)
data_cp = data_cp.fillna(0,num_cols)

In [91]:
%%time 
Dict_Null3 = {col:data_cp.filter(isnull(data_cp[col[0]])).count() for col in data_cp.dtypes }
Dict_Null3

21/12/12 12:04:39 WARN TaskSetManager: Stage 299 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:04:46 WARN TaskSetManager: Stage 337 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:04:48 WARN TaskSetManager: Stage 339 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:04:50 WARN TaskSetManager: Stage 341 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:04:51 WARN TaskSetManager: Stage 343 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:04:53 WARN TaskSetManager: Stage 345 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:04:55 WARN TaskSetManager: Stage 347 contains a task of very large size (23798 KiB). The maximum recommended task size is 10

CPU times: user 61.4 ms, sys: 31.8 ms, total: 93.2 ms
Wall time: 32.1 s


                                                                                

{('Severity', 'int'): 0,
 ('Start_Lat', 'double'): 0,
 ('Start_Lng', 'double'): 0,
 ('End_Lat', 'double'): 0,
 ('End_Lng', 'double'): 0,
 ('Distance(mi)', 'double'): 0,
 ('Number', 'double'): 0,
 ('Side', 'string'): 0,
 ('State', 'string'): 0,
 ('Country', 'string'): 0,
 ('Timezone', 'string'): 0,
 ('Temperature(F)', 'double'): 0,
 ('Wind_Chill(F)', 'double'): 0,
 ('Humidity(%)', 'double'): 0,
 ('Pressure(in)', 'double'): 0,
 ('Visibility(mi)', 'double'): 0,
 ('Wind_Direction', 'string'): 0,
 ('Wind_Speed(mph)', 'double'): 0,
 ('Weather_Condition', 'string'): 0,
 ('Amenity', 'boolean'): 0,
 ('Bump', 'boolean'): 0,
 ('Crossing', 'boolean'): 0,
 ('Give_Way', 'boolean'): 0,
 ('Junction', 'boolean'): 0,
 ('No_Exit', 'boolean'): 0,
 ('Railway', 'boolean'): 0,
 ('Roundabout', 'boolean'): 0,
 ('Station', 'boolean'): 0,
 ('Stop', 'boolean'): 0,
 ('Traffic_Calming', 'boolean'): 0,
 ('Traffic_Signal', 'boolean'): 0,
 ('Turning_Loop', 'boolean'): 0,
 ('Sunrise_Sunset', 'string'): 0,
 ('Civil_Twil

In [92]:
for c in bool_cols:
    data_cp = data_cp.withColumn(c,col(c).cast("integer"))

# Train- Test Split - Severity

In [95]:
(train, test) = data_cp.randomSplit([0.7, 0.3])
train.count(), test.count()

21/12/12 12:12:46 WARN TaskSetManager: Stage 375 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:12:53 WARN TaskSetManager: Stage 377 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

(183405, 78367)

### Modeling 

In [97]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [98]:
sIndexer = [StringIndexer(inputCol=cols, outputCol=cols+"Index") for cols in  string_cols]

In [99]:
assembler = VectorAssembler(inputCols=[s.getOutputCol() for s in sIndexer]+bool_cols+num_cols, outputCol='features')

## Logistic Regression

In [100]:
lr = LogisticRegression(featuresCol="features", labelCol=label)

In [101]:
pipeline = Pipeline(stages=sIndexer+[assembler, lr])
model = pipeline.fit(train)

21/12/12 12:16:50 WARN TaskSetManager: Stage 379 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:16:58 WARN TaskSetManager: Stage 381 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:17:06 WARN TaskSetManager: Stage 383 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:17:14 WARN TaskSetManager: Stage 385 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:17:22 WARN TaskSetManager: Stage 387 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:17:32 WARN TaskSetManager: Stage 389 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:17:41 WARN TaskSetManager: Stage 391 contains a task of very large size (23798 KiB). The maximum recommended task size is 10

21/12/12 12:18:46 WARN TaskSetManager: Stage 445 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:18:47 WARN TaskSetManager: Stage 446 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:18:47 WARN TaskSetManager: Stage 447 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:18:47 WARN TaskSetManager: Stage 448 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:18:48 WARN TaskSetManager: Stage 449 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:18:48 WARN TaskSetManager: Stage 450 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:18:48 WARN TaskSetManager: Stage 451 contains a task of very large size (23798 KiB). The maximum recommended task size is 10

21/12/12 12:19:06 WARN TaskSetManager: Stage 502 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:19:07 WARN TaskSetManager: Stage 503 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:19:07 WARN TaskSetManager: Stage 504 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:19:08 WARN TaskSetManager: Stage 505 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:19:08 WARN TaskSetManager: Stage 506 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:19:09 WARN TaskSetManager: Stage 507 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:19:09 WARN TaskSetManager: Stage 508 contains a task of very large size (23798 KiB). The maximum recommended task size is 10

In [102]:
preds = model.transform(train)
print("Prediction")
preds.select("Severity","prediction").show(20)

# evaluate the accuracy of the model using the test set
evaluator = MulticlassClassificationEvaluator(metricName='accuracy', labelCol="Severity")
accuracy = evaluator.evaluate(preds)

print()
print('#####################################')
print(f"Accuracy is {accuracy}")
print('#####################################')
print()

Prediction


21/12/12 12:20:04 WARN TaskSetManager: Stage 517 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------+----------+
|Severity|prediction|
+--------+----------+
|       2|       2.0|
|       2|       2.0|
|       2|       4.0|
|       2|       2.0|
|       2|       2.0|
|       2|       3.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       3.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
+--------+----------+
only showing top 20 rows



21/12/12 12:20:07 WARN TaskSetManager: Stage 518 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.


#####################################
Accuracy is 0.7983751806112156
#####################################



                                                                                

In [107]:
precision_eval = MulticlassClassificationEvaluator(labelCol='Severity', 
                                              predictionCol='prediction',
                                              metricName='weightedPrecision')
precision_ = precision.evaluate(preds)
print()
print('#####################################')
print(f"Precision is {precision_}")
print('#####################################')
print()



recall_eval = MulticlassClassificationEvaluator(labelCol='Severity', 
                                           predictionCol='prediction',
                                           metricName='weightedRecall')

recall_ = recall.evaluate(preds)
print()
print('#####################################')
print(f"Recall is {recall_}")
print('#####################################')
print()
f1_eval = MulticlassClassificationEvaluator(labelCol='Severity', 
                                       predictionCol='prediction',
                                       metricName='f1')
f1_ = f1_eval.evaluate(preds)
print()
print('#####################################')
print(f"f1 is {f1_}")
print('#####################################')
print()

21/12/12 12:40:12 WARN TaskSetManager: Stage 526 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:40:17 WARN TaskSetManager: Stage 528 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.



#####################################
Precision is 0.7384906216061682
#####################################



21/12/12 12:40:24 WARN TaskSetManager: Stage 530 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.



#####################################
Recall is 0.7983751806112157
#####################################



[Stage 530:>                                                        (0 + 4) / 4]


#####################################
f1 is 0.7411126478208241
#####################################



                                                                                

## Decision Tree

In [109]:
from pyspark.ml.classification import (DecisionTreeClassifier, 
                                      RandomForestClassifier, 
                                      GBTClassifier)

In [110]:
dt = DecisionTreeClassifier(featuresCol="features", labelCol=label)

In [111]:
pipeline = Pipeline(stages=sIndexer+[assembler, dt])
model = pipeline.fit(train)

21/12/12 12:46:20 WARN TaskSetManager: Stage 532 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:46:27 WARN TaskSetManager: Stage 534 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:46:35 WARN TaskSetManager: Stage 536 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:46:46 WARN TaskSetManager: Stage 538 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:46:54 WARN TaskSetManager: Stage 540 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:47:00 WARN TaskSetManager: Stage 542 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:47:09 WARN TaskSetManager: Stage 544 contains a task of very large size (23798 KiB). The maximum recommended task size is 10

IllegalArgumentException: requirement failed: DecisionTree requires maxBins (= 32) to be at least as large as the number of values in each categorical feature, but categorical feature 5 has 80 values. Consider removing this and other categorical features with a large number of values, or add more training examples.

In [112]:
preds = model.transform(train)
print("Prediction")
preds.select("Severity","prediction").show(20)

# evaluate the accuracy of the model using the test set
accuracy_eval = MulticlassClassificationEvaluator(metricName='accuracy', labelCol="Severity")
accuracy = accuracy_eval.evaluate(preds)

print()
print('#####################################')
print(f"Accuracy is {accuracy}")
print('#####################################')
print()
precision_eval = MulticlassClassificationEvaluator(labelCol='Severity', 
                                              predictionCol='prediction',
                                              metricName='weightedPrecision')
precision_ = precision.evaluate(preds)
print()
print('#####################################')
print(f"Precision is {precision_}")
print('#####################################')
print()



recall_eval = MulticlassClassificationEvaluator(labelCol='Severity', 
                                           predictionCol='prediction',
                                           metricName='weightedRecall')

recall_ = recall.evaluate(preds)
print()
print('#####################################')
print(f"Recall is {recall_}")
print('#####################################')
print()
f1_eval = MulticlassClassificationEvaluator(labelCol='Severity', 
                                       predictionCol='prediction',
                                       metricName='f1')
f1_ = f1_eval.evaluate(preds)
print()
print('#####################################')
print(f"f1 is {f1_}")
print('#####################################')
print()

Prediction


21/12/12 12:48:32 WARN TaskSetManager: Stage 556 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
[Stage 556:>                                                        (0 + 1) / 1]

+--------+----------+
|Severity|prediction|
+--------+----------+
|       2|       2.0|
|       2|       2.0|
|       2|       4.0|
|       2|       2.0|
|       2|       2.0|
|       2|       3.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       3.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
|       2|       2.0|
+--------+----------+
only showing top 20 rows



21/12/12 12:48:34 WARN TaskSetManager: Stage 557 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.
21/12/12 12:48:40 WARN TaskSetManager: Stage 559 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.



#####################################
Accuracy is 0.7983751806112156
#####################################



21/12/12 12:48:47 WARN TaskSetManager: Stage 561 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.



#####################################
Precision is 0.7384906216061682
#####################################



21/12/12 12:48:56 WARN TaskSetManager: Stage 563 contains a task of very large size (23798 KiB). The maximum recommended task size is 1000 KiB.



#####################################
Recall is 0.7983751806112157
#####################################



[Stage 563:>                                                        (0 + 4) / 4]


#####################################
f1 is 0.7411126478208241
#####################################



                                                                                

## Pandas data frame of our data:

In [None]:
data_pd_df = data.select('*').toPandas()

In [None]:
data_pd_df.head(1)

In [None]:
data_pd_df.info()

### Switching to pandas for some cleaning

In [None]:
df_copy = data_pd_df.copy()

In [None]:
df_copy['Wind_Direction'].replace('Calm', 'CALM',inplace=True)

In [None]:
df_copy['Wind_Direction'].replace('VAR', 'Variable',inplace=True)

In [None]:
#N:
df_copy['Wind_Direction'].replace('North', 'N',inplace=True)
df_copy['Wind_Direction'].replace('NNE', 'N',inplace=True)
df_copy['Wind_Direction'].replace('NNW', 'N',inplace=True)

#S:
df_copy['Wind_Direction'].replace('South', 'S',inplace=True)
df_copy['Wind_Direction'].replace('SSW', 'S',inplace=True)
df_copy['Wind_Direction'].replace('SSE', 'S',inplace=True)

#E:
df_copy['Wind_Direction'].replace('East', 'E',inplace=True)
df_copy['Wind_Direction'].replace('ESE', 'E',inplace=True)
df_copy['Wind_Direction'].replace('ENE', 'E',inplace=True)

#W:
df_copy['Wind_Direction'].replace('West', 'W',inplace=True)
df_copy['Wind_Direction'].replace('WSW', 'W',inplace=True)
df_copy['Wind_Direction'].replace('WNW', 'W',inplace=True)

In [None]:
df_copy['Wind_Direction'].replace('None', 'Variable',inplace=True)

In [None]:
df_copy['Wind_Direction'].unique()

In [None]:
df_copy.info()

### New clean DF: 

In [None]:
clean_df = df_copy.drop(['ID','Start_Time','End_Time','Start_Lat',
              'Start_Lng','End_Lat','End_Lng','Description',
              'Number','Street','City','County','State','Zipcode','Country','Airport_Code',
              'Weather_Timestamp'], axis=1, inplace=True)

In [None]:
df_copy.info()

### Repalcing nulls with mean/median values of columns

In [None]:
# Temperature(F)
df_copy['Temperature(F)'].fillna(df_copy['Temperature(F)'].median(), inplace=True)
# Wind_Chill(F)
df_copy['Wind_Chill(F)'].fillna(df_copy['Wind_Chill(F)'].median(), inplace=True)
# Humidity(%)
df_copy['Humidity(%)'].fillna(df_copy['Humidity(%)'].median(), inplace=True)
# Pressure(in)
df_copy['Pressure(in)'].fillna(df_copy['Pressure(in)'].median(), inplace=True)
# Visibility(mi)
df_copy['Visibility(mi)'].fillna(df_copy['Visibility(mi)'].median(), inplace=True)
# Wind_Speed(mph)
df_copy['Wind_Speed(mph)'].fillna(df_copy['Wind_Speed(mph)'].median(), inplace=True)
# Precipitation(in)
df_copy['Precipitation(in)'].fillna(df_copy['Precipitation(in)'].median(), inplace=True)

In [None]:
df_copy.info()

In [None]:
df_copy['Wind_Direction'].value_counts()

In [None]:
df_copy[df_copy['Wind_Direction'].isnull()]

In [None]:
# Fixing the timezone nulls

In [None]:
df_copy[df_copy['Timezone'].isnull()]

In [None]:
df_copy['Timezone'].value_counts()

In [None]:
df_copy['Timezone'].fillna('US/Pacific', inplace=True)

In [None]:
# Fixing the nulls in Weather_Condition
df_copy[df_copy['Weather_Condition'].isnull()]

In [None]:
df_copy['Weather_Condition'].value_counts()

In [None]:
df_copy['Weather_Condition'].fillna('Fair', inplace=True)

## Data Cleaning 

In [None]:
'''columns = ['ID','Severity','Start_Time','End_Time','Start_Lat','Start_Lng','End_Lat','End_Lng','Distance(mi)',
               'Description','Number','Street','Side','City','County','State','Zipcode','Country','Timezone','Airport_Code',
               'Weather_Timestamp','Temperature(F)','Wind_Chill(F)','Humidity(%)','Pressure(in)','Visibility(mi)',
               'Wind_Direction','Wind_Speed(mph)','Precipitation(in)','Weather_Condition','Amenity','Bump','Crossing',
               'Give_Way','Junction','No_Exit','Railway','Roundabout','Station','Stop','Traffic_Calming','Traffic_Signal',
               'Turning_Loop','Sunrise_Sunset','Civil_Twilight','Nautical_Twilight','Astronomical_Twilight']'''

In [None]:
from pyspark.sql.functions import udf

# columns not used in analysis
for field in ['ID','Start_Time','End_Time','Start_Lat',
              'Start_Lng','End_Lat','End_Lng','Description',
              'Number','Street','City','County','State','Zipcode','Country','Airport_Code',
              'Weather_Timestamp']:
    data = data.drop(field)

def trim(string):
    return (string
            .strip()                # string columns have spaces due to csv file format
            .replace('.','')        # remove period at end of each line
            .replace('yes','True')  # convert yes/no to True/False
            .replace('no','False'))

# udf converts a function into one that can be applied over
# a dataframe column. This is kind of like Pandas apply/map functionality.
trim = udf(trim)
    
# apply the function to every string column
for dtype in data.dtypes:
    column = dtype[0]
    if dtype[1] == 'string':
        data = data.withColumn(column, trim(data[column]))

# boolean types converted to integers
for column in ['intl_plan', 'voice_mail_plan']:
    data = data.withColumn(column, data[column].cast('boolean').cast('int'))
    
# predictors have to be of type double for many of Spark's ML models
data = data.withColumn('churned', data['churned'].cast('boolean').cast('double'))

Correlation plots of variables:

In [None]:
from pandas.plotting import scatter_matrix

sns.set_palette('dark')
sns.set_context('notebook')
sns.set_style('white')

# get the names of columns that are integers or doubles
numeric_features = [t[0] for t in data.dtypes if t[1] in ['int', 'double']]

# sample 10% of this data and convert to a Pandas dataframe
sampled_data = data.select(numeric_features).sample(False, 0.1).toPandas()

# make the scatter plot
axs = scatter_matrix(sampled_data, figsize=(12, 12));

# Rotate axis labels and remove axis ticks
n = len(sampled_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())