In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as F
spark = SparkSession.builder.config("spark.driver.memory", "4g").appName('us_accidents').getOrCreate()
sqlContext =SQLContext(spark)

In [3]:
from pyspark.sql.types import *
schema = StructType([StructField('ID', StringType(),True),
                    StructField('Source', StringType(), True),
                    StructField('TMC', DoubleType(), True),
                    StructField('Severity', IntegerType(), True),
                    StructField('Start_Time', StringType(), True),
                    StructField('End_Time', StringType(), True),
                    StructField('Start_Lat', DoubleType(), True),
                    StructField('Start_Lng', DoubleType(), True),
                    StructField('End_Lat', DoubleType(), True),
                    StructField('End_Lng', DoubleType(), True),
                    StructField('Distance(mi)', DoubleType(), True),
                    StructField('Description', StringType(), True),
                    StructField('Number', DoubleType(), True),
                    StructField('Street', StringType(), True),
                    StructField('Side', StringType(), True),
                    StructField('City', StringType(), True),
                    StructField('County', StringType(), True),
                    StructField('State', StringType(), True),
                    StructField('Zipcode', StringType(), True),
                    StructField('Country', StringType(), True),
                    StructField('TimeZone', StringType(), True),
                    StructField('Airport_Code', StringType(), True),
                    StructField('Weather_Timestamp', StringType(), True),
                    StructField('Temperature(F)', DoubleType(), True),
                    StructField('Wind_Chill(F)', DoubleType(), True),
                    StructField('Humidity(%)', DoubleType(), True),
                    StructField('Pressure(in)', DoubleType(), True),
                    StructField('Visibility(mi)', DoubleType(), True),
                    StructField('Wind_Direction', StringType(), True),
                    StructField('Wind_Speed(mph)', DoubleType(), True),
                    StructField('Precipitation(in)', DoubleType(), True),
                    StructField('Weather_Condition', StringType(), True),
                    StructField('Amenity', StringType(), True),
                    StructField('Bump', StringType(), True),
                    StructField('Crossing', StringType(), True),
                    StructField('Give_Way', StringType(), True),
                    StructField('Junction', StringType(), True),
                    StructField('No_Exit', StringType(), True),
                    StructField('Railway', StringType(), True),
                    StructField('Roundabout', StringType(), True),
                    StructField('Station', StringType(), True),
                    StructField('Stop', StringType(), True),
                    StructField('Traffic_Calming', StringType(), True),
                    StructField('Traffic_Signal', StringType(), True),
                    StructField('Turning_Loop', StringType(), True),
                    StructField('Sunrise_Sunset', StringType(), True),
                    StructField('Civil_Twilight', StringType(), True),
                    StructField('Nautical_Twilight', StringType(), True),
                    StructField('Astronomical_Twilight', StringType(), True)])
           

df = spark.read.format("csv").option("header",True).option("delimeter", ",")\
                        .schema(schema).load("US_Accidents_June20.csv")

In [4]:
print((df.count(), len(df.columns)))

(3513617, 49)


## 'count','mean','std','min','25%','50%','75%','max'

In [5]:
df.summary().select('ID','Source','TMC','Severity','Start_Time').show()

+--------+-------------+------------------+------------------+-------------------+
|      ID|       Source|               TMC|          Severity|         Start_Time|
+--------+-------------+------------------+------------------+-------------------+
| 3513617|      3513617|           2478818|           3513617|            3513617|
|    null|         null|208.02258052023183|2.3399286262560772|               null|
|    null|         null|20.766272454850682|0.5521934519055779|               null|
|     A-1|         Bing|             200.0|                 1|2016-02-08 00:37:08|
|    null|         null|             201.0|                 2|               null|
|    null|         null|             201.0|                 2|               null|
|    null|         null|             201.0|                 3|               null|
|A-999999|MapQuest-Bing|             406.0|                 4|2020-06-30 23:18:09|
+--------+-------------+------------------+------------------+-------------------+



In [6]:
df.summary().select('End_Time','Start_Lat','End_Lat','End_Lng','Distance(mi)').show()

+-------------------+-----------------+-----------------+-------------------+------------------+
|           End_Time|        Start_Lat|          End_Lat|            End_Lng|      Distance(mi)|
+-------------------+-----------------+-----------------+-------------------+------------------+
|            3513617|          3513617|          1034799|            1034799|           3513617|
|               null|36.54194510119375|37.55757770725813|-100.45598055624444| 0.281616686738409|
|               null|4.883520196081629| 4.86121476783781| 18.528793146598634|1.5501343247484947|
|2016-02-08 06:37:08|        24.555269|         24.57011|        -124.497829|               0.0|
|               null|        33.637608|         33.99477|         -118.34504|               0.0|
|               null|        35.916901|         37.79724|          -97.03692|               0.0|
|               null|         40.32217|         41.05063|         -82.106059|              0.01|
|2020-06-30 23:47:46|        4

In [7]:
df.summary().select('Description','Number','Street','Side','City').show()

+--------------------+-----------------+----------------+-------+----------+
|         Description|           Number|          Street|   Side|      City|
+--------------------+-----------------+----------------+-------+----------+
|             3513616|          1250753|         3513617|3513617|   3513505|
|                null|5975.382687868828|            null|   null|      null|
|                null|  14966.240165419|            null|   null|      null|
| middle lane bloc...|              0.0|       1 Mile Rd|       |Aaronsburg|
|                null|            863.0|            null|   null|      null|
|                null|           2798.0|            null|   null|      null|
|                null|           7098.0|            null|   null|      null|
|~6 lane blocked d...|        9999997.0|william Carey Dr|      R|   Zwingle|
+--------------------+-----------------+----------------+-------+----------+



In [8]:
df.summary().select('County','State','Zipcode','Country','TimeZone').show()

+---------+-------+------------------+-------+----------+
|   County|  State|           Zipcode|Country|  TimeZone|
+---------+-------+------------------+-------+----------+
|  3513617|3513617|           3512548|3513617|   3509737|
|     null|   null|59837.760470691836|   null|      null|
|     null|   null| 30723.36767860533|   null|      null|
|Abbeville|     AL|             01001|     US|US/Central|
|     null|   null|           30815.0|   null|      null|
|     null|   null|           65806.0|   null|      null|
|     null|   null|           91355.0|   null|      null|
|     Yuma|     WY|             99403|     US|US/Pacific|
+---------+-------+------------------+-------+----------+



In [9]:
df.summary().select('Airport_Code','Weather_Timestamp','Temperature(F)','Wind_Chill(F)','Humidity(%)').show()

+------------+-------------------+------------------+------------------+------------------+
|Airport_Code|  Weather_Timestamp|    Temperature(F)|     Wind_Chill(F)|       Humidity(%)|
+------------+-------------------+------------------+------------------+------------------+
|     3506859|            3470294|           3447885|           1645368|           3443930|
|        null|               null| 61.93511900773892|53.557295328460995| 65.11427003452451|
|        null|               null|18.621056594747568| 23.77333678193986|22.755581256697734|
|        K01M|2016-02-08 00:53:00|             -89.0|             -89.0|               1.0|
|        null|               null|              50.0|              35.7|              48.0|
|        null|               null|              64.0|              57.0|              67.0|
|        null|               null|              75.9|              72.0|              84.0|
|        KZZV|2020-06-30 23:34:00|             170.6|             115.0|        

In [10]:
df.summary().select('Pressure(in)','Visibility(mi)', 'Wind_Direction','Wind_Speed(mph)','Precipitation(in)').show()

+------------------+------------------+--------------+-----------------+--------------------+
|      Pressure(in)|    Visibility(mi)|Wind_Direction|  Wind_Speed(mph)|   Precipitation(in)|
+------------------+------------------+--------------+-----------------+--------------------+
|           3457735|           3437761|       3454743|          3059008|             1487743|
|29.744628810478126|  9.12264429086265|          null|8.219024631515838|0.015982558815601934|
|0.8319758234849882|2.8858793265902425|          null|5.262847339451445| 0.19282619342581409|
|               0.0|               0.0|          CALM|              0.0|                 0.0|
|             29.73|              10.0|          null|              5.0|                 0.0|
|             29.95|              10.0|          null|              7.0|                 0.0|
|             30.09|              10.0|          null|             11.5|                 0.0|
|             57.74|             140.0|          West|      

In [4]:
"""
Counts number of nulls and nans in each column
"""
df2 = df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type) in df.dtypes]).toPandas()

if len(df2) == 0:
    print("There are no any missing values!")
    
print(df2.rename(index={0: 'count'}).T.sort_values("count",ascending=False))

                         count
End_Lng                2478818
End_Lat                2478818
Number                 2262864
Precipitation(in)      2025874
Wind_Chill(F)          1868249
TMC                    1034799
Wind_Speed(mph)         454609
Weather_Condition        76138
Visibility(mi)           75856
Humidity(%)              69687
Temperature(F)           65732
Wind_Direction           58874
Pressure(in)             55882
Weather_Timestamp        43323
Airport_Code              6758
TimeZone                  3880
Zipcode                   1069
Sunrise_Sunset             115
Civil_Twilight             115
Nautical_Twilight          115
Astronomical_Twilight      115
City                       112
Description                  1
Country                      0
Junction                     0
Severity                     0
Start_Time                   0
End_Time                     0
Turning_Loop                 0
Traffic_Signal               0
Traffic_Calming              0
Stop    

## Removing useless attributes

In [4]:
useless = ["ID", "Source", "End_Lat", "End_Lng", "Number", "Street", "Zipcode", "Timezone", "Country", "Airport_Code", "Weather_Timestamp", "Sunrise_Sunset", "Nautical_Twilight", "Astronomical_Twilight"]
df = df.drop(*useless)

In [5]:
df.printSchema()

root
 |-- TMC: double (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: string (nullable = true)
 |-- End_Time: string (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Side: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- Visibility(mi): double (nullable = true)
 |-- Wind_Direction: string (nullable = true)
 |-- Wind_Speed(mph): double (nullable = true)
 |-- Precipitation(in): double (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Amenity: string (nullable = true)
 |-- Bump: string (nullable = true)
 |-- Crossing: string (nullable

## Removing rows with almost one missing column

In [6]:
df = df.dropna()

In [7]:
print((df.count(), len(df.columns)))

(781146, 35)


In [8]:
"""
Counts number of nulls and nans in each column
"""
df2 = df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type) in df.dtypes]).toPandas()

if len(df2) == 0:
    print("There are no any missing values!")
    
print(df2.rename(index={0: 'count'}).T.sort_values("count",ascending=False))

                   count
TMC                    0
No_Exit                0
Weather_Condition      0
Amenity                0
Bump                   0
Crossing               0
Give_Way               0
Junction               0
Railway                0
Wind_Speed(mph)        0
Roundabout             0
Station                0
Stop                   0
Traffic_Calming        0
Traffic_Signal         0
Turning_Loop           0
Precipitation(in)      0
Wind_Direction         0
Severity               0
Side                   0
Start_Time             0
End_Time               0
Start_Lat              0
Start_Lng              0
Distance(mi)           0
Description            0
City                   0
Visibility(mi)         0
County                 0
State                  0
Temperature(F)         0
Wind_Chill(F)          0
Humidity(%)            0
Pressure(in)           0
Civil_Twilight         0


## Correlation Matrix

In [9]:
from pyspark.mllib.stat import Statistics
import pandas as pd


# result can be used w/ seaborn's heatmap
def compute_correlation_matrix(df, method='pearson'):
    # wrapper around
    # https://forums.databricks.com/questions/3092/how-to-calculate-correlation-matrix-with-all-colum.html
    df_rdd = df.rdd.map(lambda row: row[0:])
    corr_mat = Statistics.corr(df_rdd, method=method)
    corr_mat_df = pd.DataFrame(corr_mat,
                    columns=df.columns, 
                    index=df.columns)
    return corr_mat_df

numeric_features = [t[0] for t in df.dtypes if t[1] == 'int' or t[1]=='double']
df2=df.select(numeric_features)
print(df2)

print(compute_correlation_matrix(df2))

DataFrame[TMC: double, Severity: int, Start_Lat: double, Start_Lng: double, End_Lat: double, End_Lng: double, Distance(mi): double, Number: double, Temperature(f): double, Wind_chill(f): double, Humidity(%): double, Pressure(in): double, Visibility(mi): double, Wind_Speed(mph): double, Precipitation(in): double]
                            TMC  Severity  Start_Lat  Start_Lng       End_Lat  \
TMC                1.000000e+00  0.132549  -0.004657  -0.013330 -8.829529e-08   
Severity           1.325486e-01  1.000000   0.047617   0.083705  2.503775e-02   
Start_Lat         -4.657480e-03  0.047617   1.000000  -0.013994  5.403886e-01   
Start_Lng         -1.332991e-02  0.083705  -0.013994   1.000000 -1.472758e-02   
End_Lat           -8.829529e-08  0.025038   0.540389  -0.014728  1.000000e+00   
End_Lng            1.828266e-08  0.179512  -0.013581   0.578980 -2.504327e-02   
Distance(mi)       3.617129e-02  0.150326   0.062742   0.048143  1.544524e-02   
Number             1.666052e-02  0.009

## Transform no-numeric features

In [9]:
no_numeric = ["Source", "Start_Time", "End_Time", "Description", "Street", "Side", "City", "County", "State", "Zipcode", "Country", "TimeZone", "Airport_Code", "Weather_Timestamp", "Wind_Direction", "Weather_Condition", "Amenity", "Bump", "Crossing", "Give_Way", "Junction", "No_Exit", "Railway", "Roundabout", "Station", "Stop", "Traffic_Calming", "Traffic_Signal", "Turning_Loop", "Sunrise_Sunset", "Civil_Twilight", "Nautical_Twilight", "Astronomical_Twilight"]
numeric_features = ["TMC", "Severity", "Start_Lat", "Start_Lng", "End_Lat", "End_Lng", "Distance(mi)", "Number", "Temperature(f)", "Wind_chill(f)", "Humidity(%)", "Pressure(in)", "Visibility(mi)", "Wind_Speed(mph)", "Precipitation(in)"]
output = "Severity"

## Start and end time to Duration

In [10]:
features_dates = ["Start_Time", "End_Time"]

In [11]:
from pyspark.sql.functions import col, unix_timestamp, to_date

for feature in features_dates:
    df = df.withColumn(feature+"Vec", unix_timestamp(feature, "yyyy-MM-dd HH:mm:ss"))
    df = df.drop(feature)

In [12]:
df = df.withColumn("Duration", df["End_TimeVec"] - df["Start_TimeVec"])

In [13]:
df = df.drop(*["End_TimeVec", "Start_TimeVec"])

### Text embedding for Description column

In [20]:
"""from pyspark.ml.feature import Word2Vec

df = df.withColumn("Description", F.split(df["Description"], " "))

word2Vec = Word2Vec(inputCol="Description", outputCol="DescriptionVec")
model = word2Vec.fit(df)

df = model.transform(df)"""

### OneHotEncoder

In [14]:
fetures_toencode = ["Side", "City", "County", "State", "Wind_Direction", "Weather_Condition", "Amenity", "Bump", "Crossing", "Give_Way", "Junction", "No_Exit", "Railway", "Roundabout", "Station", "Stop", "Traffic_Calming", "Traffic_Signal", "Turning_Loop", "Civil_Twilight"]
fetures_encoded = [feature+"Vec" for feature in fetures_toencode]

In [15]:
import pyspark.sql.functions as f

for feature in fetures_toencode:
    df = df.withColumn(feature,f.lower(f.col(feature)))

In [17]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer

for index_feature in range(len(fetures_toencode)):
    indexer = StringIndexer(inputCol=fetures_toencode[index_feature], outputCol=fetures_encoded[index_feature])
    model = indexer.fit(df)
    df = model.transform(df)

### Dates

In [18]:
df.printSchema()

root
 |-- TMC: double (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Side: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- Visibility(mi): double (nullable = true)
 |-- Wind_Direction: string (nullable = true)
 |-- Wind_Speed(mph): double (nullable = true)
 |-- Precipitation(in): double (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Amenity: string (nullable = true)
 |-- Bump: string (nullable = true)
 |-- Crossing: string (nullable = true)
 |-- Give_Way: string (nullable = true)
 |-- Junction: string (nullable =

## Transforming to vector

In [19]:
features = ["TMC", "Start_Lat", "Start_Lng", "Distance(mi)", "Temperature(F)", "Wind_Chill(F)", "Humidity(%)", "Pressure(in)", "Visibility(mi)", "Wind_Speed(mph)", "Precipitation(in)", "Duration", "SideVec", "CityVec", "CountyVec", "StateVec", "Wind_DirectionVec", "Weather_ConditionVec", "AmenityVec", "BumpVec", "CrossingVec", "Give_WayVec", "JunctionVec", "No_ExitVec", "RailwayVec", "RoundaboutVec", "StationVec", "StopVec", "Traffic_CalmingVec", "Traffic_SignalVec", "Turning_LoopVec", "Civil_TwilightVec"]

In [20]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=features, outputCol="features")
v_df = vectorAssembler.transform(df)
v_df = v_df.select(["features", "Severity"])
v_df = v_df.withColumnRenamed("Severity", "label")

In [24]:
splits = v_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

### Free memory

In [67]:
df.printSchema()

root
 |-- TMC: double (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Side: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- Visibility(mi): double (nullable = true)
 |-- Wind_Direction: string (nullable = true)
 |-- Wind_Speed(mph): double (nullable = true)
 |-- Precipitation(in): double (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Amenity: string (nullable = true)
 |-- Bump: string (nullable = true)
 |-- Crossing: string (nullable = true)
 |-- Give_Way: string (nullable = true)
 |-- Junction: string (nullable =

In [21]:
df.unpersist()

DataFrame[ID: string, Source: string, TMC: double, Severity: int, Start_Time: string, End_Time: string, Start_Lat: double, Start_Lng: double, End_Lat: double, End_Lng: double, Distance(mi): double, Description: array<string>, Number: double, Street: string, Side: string, City: string, County: string, State: string, Zipcode: string, Country: string, TimeZone: string, Airport_Code: string, Weather_Timestamp: string, Temperature(f): double, Wind_chill(f): double, Humidity(%): double, Pressure(in): double, Visibility(mi): double, Wind_Direction: string, Wind_Speed(mph): double, Precipitation(in): double, Weather_Condition: string, Amenity: string, Bump: string, Crossing: string, Give_Way: string, Junction: string, No_Exit: string, Railway: string, Roundabout: string, Station: string, Stop: string, Traffic_Calming: string, Traffic_Signal: string, Turning_Loop: string, Sunrise_Sunset: string, Civil_Twilight: string, Nautical_Twilight: string, Astronomical_Twilight: string, DescriptionVec: ve

## Logistic Regression

In [22]:
from pyspark.ml.classification import LogisticRegression

In [23]:
lr = LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0.8)

In [25]:
lrModel = lr.fit(train_df)

In [26]:
predictions = lrModel.transform(test_df)

In [27]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import DoubleType

In [28]:
predictions_raw = predictions.withColumn("prediction", col("prediction")) \
                             .withColumn("label", col("label").cast(DoubleType())) \
                             .drop('features') \
                             .drop('rawPrediction') \
                             .drop('probability')

In [29]:
metrics = MulticlassMetrics(predictions_raw.rdd)

In [30]:
print("Summary Stats for LogisticRegression")
print("Accuracy = %s" % metrics.accuracy)
print("Precision = %s" % metrics.weightedPrecision)
print("Recall = %s" % metrics.weightedRecall)
print("F1 Score = %s" % metrics.weightedFMeasure())

Summary Stats for LogisticRegression
Accuracy = 0.6936325143842516
Precision = 1.0
Recall = 0.6936325143842516
F1 Score = 0.8191062801323618


In [31]:
lrModel.save("LogisticRegression")