## Productionization - scaling the process for 70 million events / day

This notebook describes the driver safety estimation process in Spark. In order to use it, one needs to install pyspark or run on a spark cluster. 

Follow these instructions to install pyspark:
- Windows: https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c
- MacOS / Linux: https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f

Initiate Spark session:

In [2]:
spark

In [3]:
import pyarrow
import pandas as pd
import numpy as np

Load data from CSV into spark:

In [4]:
dfRaw = spark.createDataFrame(pd.read_csv('dataset.csv'))

In [5]:
dfRaw.show()

+----------+--------+-----------------+------------------+-------------------+----------+--------------------+
|Unnamed: 0|DriverId|        EventName|          Latitude|          Longitude|Speed km/h|                  ts|
+----------+--------+-----------------+------------------+-------------------+----------+--------------------+
|         0|       0|      Timed Event|34.186630640899025| -118.0881019771853|      64.0|2017-11-01 00:00:...|
|         1|       0|   Distance Event| 34.18605950880732|-118.08924119475836|      53.0|2017-11-01 00:00:...|
|         2|       0|   Distance Event|34.186408023905244|-118.08956044011869|      34.0|2017-11-01 00:00:...|
|         3|       0|   Distance Event|34.187478566679914|-118.08891455921557|      33.0|2017-11-01 00:00:...|
|         4|       0|   Distance Event| 34.18866466110383|-118.08645912965711|      32.0|2017-11-01 00:00:...|
|         5|       0|   Distance Event| 34.18817105525333|-118.08727910558605|      47.0|2017-11-01 00:00:...|
|

### Data opreparation

In [11]:
import pyspark.sql.functions as psf

RELEVANT_EVENTS = ['Harsh Acceleration', 'Reached max speed', 'Out of max speed',
       'Harsh Braking', 'Harsh Turn (motion based)',
       'Harsh Braking (motion based)', 'Harsh Acceleration (motion based)',
       'Harsh Turn Left (motion based)', 'Harsh Turn Right (motion based)']

## Filter out unwanted events
df = dfRaw.filter(dfRaw.EventName.isin(RELEVANT_EVENTS))
print ("{} samples after removing events that are not relevant for modeling".format(df.count()))
df = df.select("DriverId","ts","EventName","Latitude","Longitude","Speed km/h")


####### RUN PER POPULATION SEGMENT #######

## Keep only the drivers in the current segment. This notebook runs multiple times in DataBricks, once for each population segment.
## Each time we filter the drivers in this segment and perform the analysis on these drivers. 
## It is commented out since we don't have any segments in the sample data.

#populationSegment = Segments.filter('SegmentId == "' + segmentId + '"')
#df = df.join(PopulationSegment,"DriverId")
#print ('number of drivers: {}'.format(populationSegment.count()))
#print(str(df.count()) + " Events after segmentation")

##########################################

# Remove NAs
df = df.dropna()
print("Removed NAs")

# Filter out users with too few samples
minRecordsPerDriver=50

subs = df.groupBy('DriverId').count().filter('count>{}'.format(minRecordsPerDriver))
print('Number of drivers = {}'.format(subs.count()))
df = df.join(subs,['DriverId'])

cnt = df.count()
print("{} events remained after cleansing".format(cnt))


75654 samples after removing events that are not relevant for modeling
Removed NAs
Number of drivers = 189
74730 events remained after cleansing


### Calculate total drive distance per driver

In [12]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
import numpy as np
from pyspark.sql.types import *

schema = StructType([
  StructField("Distance", FloatType()),
  StructField("DriverId", IntegerType())

])


def haversine(lon1, lat1, lon2, lat2):
    lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2

    c = 2 * np.arcsin(np.sqrt(a))
    km = 6367 * c
    return km

@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def total_distance(oneDriver):
    dist = haversine(oneDriver.Longitude.shift(1), oneDriver.Latitude.shift(1),
                 oneDriver.loc[1:, 'Longitude'], oneDriver.loc[1:, 'Latitude'])
    return pd.DataFrame({"DriverId":oneDriver['DriverId'].iloc[0],"Distance":np.sum(dist)},index = [0])

## Calculate the overall distance made by each subscriber
distancePerDriver = dfRaw.groupBy('DriverId').apply(total_distance)
#distancePerDriver = distancePerDriver[distancePerDriver.Distance > 0]
distancePerDriver.sort('DriverId').show()

+---------+--------+
| Distance|DriverId|
+---------+--------+
|1753.2018|       0|
| 1918.817|       1|
|1074.2893|       2|
| 391.8667|       3|
| 534.5035|       4|
|2273.0337|       5|
| 586.1509|       6|
|1125.1642|       7|
|1849.9005|       8|
|426.77353|       9|
|485.79166|      10|
|843.41595|      11|
|927.64954|      12|
|149.02852|      13|
| 2094.504|      14|
|1325.4692|      15|
|1341.9535|      16|
|996.11487|      17|
|1215.6517|      18|
| 510.8158|      19|
+---------+--------+
only showing top 20 rows



### Feature engineering

In [17]:
featureNames = ['Harsh Turn (motion based)','Harsh Acceleration (motion based)','Harsh Braking (motion based)','OverSpeed']
colNames = featureNames[:]
colNames.append('DriverId')

print("Starting feature engineering with {} samples".format(df.count()))

## Pivot the data from events to features: 1. Count number of events per type per driver, 2. Pivot into a data frames of drivers X features
subs = df.groupby('DriverId').count()

eventsPerDriver = df.groupby(["DriverId","EventName"]).count()

eventsPerDriver = eventsPerDriver.groupby("DriverId").pivot('EventName').sum('count')
eventsPerDriver = eventsPerDriver.na.fill(0)


## Add total drive duration per driver
eventsPerDriver = distancePerDriver.join(eventsPerDriver,'DriverId')
print("{} drivers after join with distances".format(eventsPerDriver.count()))
## Divide the number of events per subscriber with the total driver duration per subscriber
for fea in featureNames:
    if fea in eventsPerDriver.columns:
        eventsPerDriver = eventsPerDriver.withColumn(fea,psf.col(fea)/eventsPerDriver.Distance)
    else:
        eventsPerDriver = eventsPerDriver.withColumn(fea,psf.lit(0))

## Keep only feature columns
eventsPerDriver = eventsPerDriver.select(colNames)


Starting feature engineering with 74730 samples
189 drivers after join with distances


In [18]:
display(eventsPerDriver)

DataFrame[Harsh Turn (motion based): double, Harsh Acceleration (motion based): double, Harsh Braking (motion based): double, OverSpeed: int, DriverId: int]

Switch from a PySpark data frame to a Pandas data frame for the rest of the analysis:

In [19]:
features = eventsPerDriver.toPandas().set_index('DriverId')

### Handle outliers

In [20]:
import scipy.stats as st

def transform_to_normal(x):
    xt = np.zeros(len(x))
    if np.count_nonzero(x) == 0:
        print("only zero valued values found")
        return x
    
    valueGreaterThanZero = np.where(x<=0,0,1)
    positives = x[valueGreaterThanZero == 1]
    if(len(positives)> 0):
        xt[valueGreaterThanZero == 1],_ = st.boxcox(positives+1)
    xt = xt - np.min(xt)
    return xt

def replace_outliers_with_limit(x, stdFactor = 2.5):
    x = x.values
    xt = np.zeros(len(x))
    if np.count_nonzero(x) == 0:
        print("only zero valued values found\n")
        return x
    
    xt = transform_to_normal(x)
    
    xMean, xStd = np.mean(xt), np.std(xt)
    outliers = np.where(xt > xMean + stdFactor*xStd)[0]
    inliers = np.where(xt <= xMean + stdFactor*xStd)[0]
    if len(outliers) > 0:
        print("found outlier with factor: "+str(stdFactor)+" : "+str(outliers))
        xinline = x[inliers]
        maxInRange = np.max(xinline)
        print("replacing outliers {} with max={}".format(outliers,maxInRange))
        vals = x.copy()
        vals[outliers] = maxInRange
    return x

cleanFeatures = features.apply(replace_outliers_with_limit)
cleanFeatures.head(10)

found outlier with factor: 2.5 : [ 20  23  32  54  62  85 142]
replacing outliers [ 20  23  32  54  62  85 142] with max=0.032507542602764665
only zero valued values found



Unnamed: 0_level_0,Harsh Turn (motion based),Harsh Acceleration (motion based),Harsh Braking (motion based),OverSpeed
DriverId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
29,0.0,0.0,0.0,0
65,0.024376,0.014626,0.005485,0
191,0.008038,0.008931,0.03215,0
222,0.002871,0.0,0.009569,0
19,0.0,0.005873,0.003915,0
54,0.0,0.0,0.0,0


In [21]:
import scipy.stats as st

def extract_cummulative_prob_from_dist(x):
    
    print("extracting commulative probs for series:",x.name,"length",len(x))
    #print(x.head())
    
    xPositive = x[x>0]
    xNonPositive = x[x<=0]
    
    ratioOfZeroEvents = len(xNonPositive)/len(x)
    probs = np.zeros(len(x))
    if(len(xPositive)>0):
        params = st.expon.fit(xPositive)
        arg = params[:-2]
        loc = params[-2]
        scale = params[-1]
        print('params = {}, {}, {}.'.format(arg,loc,scale))
        probs[x>0] = st.expon.cdf(xPositive, loc=loc, scale=scale, *arg)

    return probs
  
cdfs = cleanFeatures.apply(extract_cummulative_prob_from_dist).add_suffix("_CDF")
cdfs.head(10)

extracting commulative probs for series: Harsh Turn (motion based) length 189
params = (), 0.0006723457225678595, 0.027154269594805047.
extracting commulative probs for series: Harsh Turn (motion based) length 189
params = (), 0.0006723457225678595, 0.027154269594805047.
extracting commulative probs for series: Harsh Acceleration (motion based) length 189
params = (), 0.00047724328580577187, 0.010651619847116538.
extracting commulative probs for series: Harsh Braking (motion based) length 189
params = (), 0.0012179933230326801, 0.028236240397887587.
extracting commulative probs for series: OverSpeed length 189


Unnamed: 0_level_0,Harsh Turn (motion based)_CDF,Harsh Acceleration (motion based)_CDF,Harsh Braking (motion based)_CDF,OverSpeed_CDF
DriverId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
29,0.0,0.0,0.0,0.0
65,0.582276,0.735071,0.140243,0.0
191,0.237564,0.547797,0.665622,0.0
222,0.077766,0.0,0.256027,0.0
19,0.0,0.397437,0.091106,0.0
54,0.0,0.0,0.0,0.0
0,0.406299,0.008706,0.587733,0.0
113,0.398448,0.0,0.441974,0.0
155,0.186887,0.052206,0.379146,0.0
167,0.275357,0.419959,0.803008,0.0


In [22]:
from scipy.stats import rankdata

NUM_OF_FEATURES = 3.0

cdfs['metric'] = cdfs.sum(axis = 1) / NUM_OF_FEATURES
cdfs = cdfs.sort_values('metric')
cdfs['rank'] = cdfs.metric.rank(method="min")/len(cdfs)*1.0

finalDF = spark.createDataFrame(cdfs.reset_index())

In [None]:
finalDF.head(10)