Note: This notebook can be run using spark.

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType

import pandas as pd
import numpy as np
import pygeohash as pgh
from haversine import haversine

spark = (SparkSession
         .builder
         .master("local")
         .appName("my_local")
         .getOrCreate())


In [None]:
# load data 

drive = spark.read.parquet('drive/')
vehicle = spark.read.csv('vehicle.csv', header=True)
weather = spark.read.parquet('weather/')
trip = spark.read.parquet('trip/')

In [5]:
drive.show(10)

+----------+--------------------+-------------------+--------+-------+-------+-------+-------------------+--------+----------+----+-------+-----------------+
|vehicle_id|             trip_id|           datetime|velocity|accel_x|accel_y|accel_z|engine_coolant_temp|eng_load|fuel_level| iat|    rpm|__index_level_0__|
+----------+--------------------+-------------------+--------+-------+-------+-------+-------------------+--------+----------+----+-------+-----------------+
|   1000516|36417dfecb2049769...|2017-01-08 11:00:00|     0.0|  83.12|  72.16|  69.85|               91.0|   210.6|    101.88|82.0|2123.04|                0|
|   1000516|36417dfecb2049769...|2017-01-08 11:00:01|   82.28|  82.08|  75.38|  78.92|               90.0|  209.45|    104.43|80.0|2119.44|                1|
|   1000516|36417dfecb2049769...|2017-01-08 11:00:02|   83.72|  72.93|  71.72|  76.73|               98.0|  217.33|    104.86|63.0|2118.09|                2|
|   1000516|36417dfecb2049769...|2017-01-08 11:00:03

In [547]:
trip.show(10)

+----------+--------------------+-------------------+-------+------------------+--------+-----------------+
|vehicle_id|             trip_id|           datetime|    lat|              long|velocity|__index_level_0__|
+----------+--------------------+-------------------+-------+------------------+--------+-----------------+
|   1000516|36417dfecb2049769...|2017-01-08 11:00:00|31.8125|          -83.9375|     0.0|                0|
|   1000516|36417dfecb2049769...|2017-01-08 11:00:01|31.8125|-83.93722222222223|   82.28|                1|
|   1000516|36417dfecb2049769...|2017-01-08 11:00:02|31.8125|-83.93694444444445|   83.72|                2|
|   1000516|36417dfecb2049769...|2017-01-08 11:00:03|31.8125|-83.93666666666667|   67.14|                3|
|   1000516|36417dfecb2049769...|2017-01-08 11:00:04|31.8125|-83.93638888888889|   44.04|                4|
|   1000516|36417dfecb2049769...|2017-01-08 11:00:05|31.8125|-83.93611111111112|   70.02|                5|
|   1000516|36417dfecb204976

In [7]:
vehicle.show(10)

+----------+----+----------+-----------+----------+----------+--------------+------------------+--------------+-------------------+---------+------------------+-----------------+--------------------+---------+----------------+-----------------+
|vehicle_id|year|      make|      Model|drivetrain|max_torque|max_horsepower|max_horsepower_rpm|max_torque_rpm|engine_displacement|fuel_type|fuel_tank_capacity|fuel_economy_city|fuel_economy_highway|cylinders|forced_induction|device_generation|
+----------+----+----------+-----------+----------+----------+--------------+------------------+--------------+-------------------+---------+------------------+-----------------+--------------------+---------+----------------+-----------------+
|   1000500|2016|     Honda|      Civic|         2|       174|           140|              6500|          1500|              1.799|     1059|                47|             16.5|                  20|        4|            1054|                5|
|   1000501|2016|   

### Question1 - engine_features.csv

join drive and vehicles df
vehicles file is super small

In [55]:
drive2 = drive.select('vehicle_id','datetime','eng_load','rpm')
drive2 = drive2.withColumn('vehicle_id', drive2.vehicle_id.cast('string'))

vehicle2 = vehicle.select('vehicle_id','max_torque','max_horsepower','max_horsepower_rpm')

drive2 = drive2.join(vehicle2, on='vehicle_id', how='left')

In [58]:
drive2 = drive2.withColumn('pst_timestamp', F.from_utc_timestamp('datetime', 'PST'))
drive2 = drive2.drop('datetime')
drive2 = drive2.na.fill({'max_torque':0, 'max_horsepower': 0, 'max_horsepower_rpm': 0})

In [61]:
drive2 = (drive2
          .withColumn('active_horsepower', 
                      ((F.col('eng_load') / 255) * (F.col('max_torque') * F.col('rpm'))) / 5252 ))

drive2 = (drive2
          .withColumn('horsepower_util', (F.col('active_horsepower') / F.col('max_horsepower'))))

drive2 = (drive2
          .withColumn('torque_util', (F.col('eng_load') / 255)))

drive2 = (drive2
          .withColumn('rpm_util', (F.col('rpm') / F.col('max_horsepower_rpm'))))

drive2 = drive2.na.fill({'horsepower_util':0, 'rpm_util': 0})

In [62]:
schema = StructType([
    StructField("vehicle_id", StringType()),
    StructField("week_start_date", TimestampType()),
    StructField("ft_torque_util_60pct_s", DoubleType()),
    StructField("ft_torque_util_70pct_s", DoubleType()),
    StructField("ft_torque_util_80pct_s", DoubleType()),
    StructField("ft_torque_util_90pct_s", DoubleType()),
    StructField("ft_horsepower_util_50pct_s", DoubleType()),
    StructField("ft_horsepower_util_60pct_s", DoubleType()),
    StructField("ft_horsepower_util_70pct_s", DoubleType()),
    StructField("ft_horsepower_util_80pct_s", DoubleType()),
    StructField("ft_rpm_util_50pct_s", DoubleType()),
    StructField("ft_rpm_util_60pct_s", DoubleType())
])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def calculate(df):
    
    def calculate_pct_secs(pdf, feat, pct1, pct2):
        
        # total feat value
        total_torque = pdf[feat].sum() 
        
        # % of total torque
        first_pct = pct1*total_torque
        second_pct = pct2*total_torque
        
        # cumsum
        pdf['time_cumsum'] = pdf[feat].cumsum()
        pdf = pdf.loc[(pdf['time_cumsum'] >= first_pct) & (pdf['time_cumsum'] < second_pct)]
        secs = (pdf['pst_timestamp'].max() - pdf['pst_timestamp'].min()) / pd.to_timedelta(1, unit='S')
        return secs
   
    
    def wrapper(pdf):
        
        sol = {}
        pdf['pst_timestamp'] = pd.to_datetime(pdf['pst_timestamp'])
        
        vals = {'torque_util': [('ft_torque_util_60pct_s',0.60,0.70),
                                ('ft_torque_util_70pct_s',0.70,0.80),
                                ('ft_torque_util_80pct_s',0.80,0.90),
                                ('ft_torque_util_90pct_s',0.90,1.0)],
               'horsepower_util' : [('ft_horsepower_util_50pct_s',0.50,0.60),
                                    ('ft_horsepower_util_60pct_s',0.60,0.70),
                                    ('ft_horsepower_util_70pct_s',0.70,0.80),
                                    ('ft_horsepower_util_80pct_s',0.80,0.90)],
               'rpm_util': [('ft_rpm_util_50pct_s',0.50,0.60),
                            ('ft_rpm_util_60pct_s',0.60,0.70)]}
        
        for feat, val in vals.items():
    
            for new_feat, pct1, pct2 in val:
                # print('inside list')
                p = calculate_pct_secs(pdf, feat, pct1, pct2)
                sol[new_feat] = p if p > 0 else 0
                print(sol)
            
        return pd.Series(sol)
    
    p = df.groupby(['vehicle_id', pd.Grouper(key='pst_timestamp', freq='W-MON')]).apply(wrapper).reset_index()
    print(p.columns)
    print(p.head())
    p.columns = ['vehicle_id',
                 'week_start_date',
                 'ft_torque_util_60pct_s',
                 'ft_torque_util_70pct_s',
                 'ft_torque_util_80pct_s',
                 'ft_torque_util_90pct_s',
                 'ft_horsepower_util_50pct_s',
                 'ft_horsepower_util_60pct_s',
                 'ft_horsepower_util_70pct_s',
                 'ft_horsepower_util_80pct_s',
                 'ft_rpm_util_50pct_s',
                 'ft_rpm_util_60pct_s']
    return p
    



In [63]:
g = drive2.groupby('vehicle_id').apply(calculate)
g = g.withColumn('week_start_date', F.date_format('week_start_date', 'YYYY-MM-DD'))
g = g.orderBy('vehicle_id','week_start_date')

In [67]:
g.repartition(1).write.csv('./engine_features', header=True)

In [66]:
! rm -fr engine_features

### 2. drive_features

In [47]:
g2 = drive.select('trip_id','datetime','velocity')
g2 = g2.withColumn('lag_velocity', F.lag(g2['velocity']).over(Window.partitionBy('trip_id').orderBy('trip_id')))
g2 = g2.withColumn('diff', F.col('velocity') - F.col('lag_velocity'))

In [50]:
schema = StructType([
    StructField("trip_id", StringType()),
    StructField("ft_cnt_vehicle_deaccel_val", IntegerType()),
    StructField("ft_sum_hard_brakes_10_flg_val", IntegerType()),
    StructField("ft_sum_hard_brakes_3_flg_val", IntegerType()),
    StructField("ft_sum_time_deaccel_val", IntegerType()),
    StructField("ft_cnt_vehicle_accel_val", IntegerType()),
    StructField("ft_sum_hard_accel_10_flg_val", IntegerType()),
    StructField("ft_sum_hard_accel_3_flg_val", IntegerType()),
    StructField("ft_sum_time_accel_val", IntegerType())
    #StructField("velocity", DoubleType()),
    #StructField("lag_velocity", DoubleType()),
    #StructField("diff", DoubleType()),
   
])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute(df):
    
    def inner_compute(df):
        from itertools import groupby, zip_longest

        x = (list(v) for k,v in groupby(df['diff'], lambda x: x < 0))
        l = list(zip_longest(x, x, fillvalue=[]))
        l = [i for s in l for i in s]

        # count of deaccelaration
        cou_de = sum([1 for i in l if sum(i) > 0])
        
        # deacc < -10
        cou_le = sum([1 for i in l if any([y for y in i if y <= -10])])
        
        # deacc <= -3 and > -10
        cou_pe = sum([1 for i in l if any([y for y in i if y <= -3 and y > -10])])
        
        # seconds, time taken in deaccelaration - assuming each tracking is 1 sec
        time_taken = sum(1 for x in pd.np.concatenate(l) if x < 0)
        
        # accelaration count
        acc_count = sum([1 for i in l if any([y for y in i if y > 0])])
        
        # accelaration count > 10
        acc_count_10 = sum([1 for i in l if any([y for y in i if y >= 10])])
        
        # accelaration count > 3 and count < 10
        acc_count_3 = sum([1 for i in l if any([y for y in i if y >= 3 and y < 10])])
        
        # time taken accel
        time_taken_accel = sum(1 for x in pd.np.concatenate(l) if x > 0)
        
        
        return pd.Series({'ft_cnt_vehicle_deaccel_val': cou_de, 
                          'ft_sum_hard_brakes_10_flg_val': cou_le,
                          'ft_sum_hard_brakes_3_flg_val': cou_pe,
                          'ft_sum_time_deaccel_val': time_taken,
                          'ft_cnt_vehicle_accel_val': acc_count,
                          'ft_sum_hard_accel_10_flg_val': acc_count_10,
                          'ft_sum_hard_accel_3_flg_val': acc_count_3,
                          'ft_sum_time_accel_val': time_taken_accel})
    
    p = df.groupby('trip_id').apply(inner_compute).reset_index()
    #p.columns = ['trip_id',
    #             'ft_cnt_vehicle_deaccel_val',
    #             'ft_sum_hard_brakes_10_flg_val']
    return p

In [51]:
g3 = g2.groupby('trip_id').apply(compute).orderBy('trip_id')

In [53]:
g3.repartition(1).write.csv('./drive_features', header=True)

In [52]:
! rm -fr drive_features

### 3. weather features

In [7]:
weather = spark.read.parquet('weather/')
trip = spark.read.parquet('trip/')
trip = trip.drop('__index_level_0__')

In [8]:
trip = trip.withColumn('long', F.format_number('long', 4))
trip = trip.withColumn('long', F.col('long').cast('double'))
trip = trip.withColumnRenamed('long','lon')

trip = trip.withColumn('lat', F.format_number('lat', 4))
trip = trip.withColumn('lat', F.col('lat').cast('double'))

trip = trip.withColumn('pst_timestamp', F.from_utc_timestamp('datetime', 'PST'))
trip = trip.withColumn('time', F.hour('pst_timestamp'))
trip = trip.withColumn('date', F.to_date('pst_timestamp', 'YYYY-MM-DD'))
trip = trip.withColumn('date', F.col('date').cast('string'))

In [9]:
weather = weather.withColumn('time', F.regexp_replace('time', r'[^\d+]',''))
weather = weather.withColumn('time', F.col('time').cast('integer'))
weather = weather.withColumn('date', F.col('date').cast('string'))

weather = weather.withColumn('temperature_data', F.format_number('temperature_data', 2))
weather = weather.withColumn('temperature_data', F.col('temperature_data').cast('double'))

In [10]:
# encode lat. long
udf1 = F.udf(lambda x, y: pgh.encode(x,y,precision=5))
trip = trip.withColumn('encoded_ll',udf1('lat','lon'))
weather = weather.withColumn('encoded_ll',udf1('lat','lon'))

# convert kelvin to farenheit
# (296K − 273.15) × 9/5 + 32 = 73.13°F
udf2 = F.udf(lambda x: round(((x - 273.15) * (9/5) + 32), 2))
weather = weather.withColumn('temperature_data', udf2('temperature_data'))
weather = weather.withColumn('temperature_data', F.col('temperature_data').cast('double'))

In [11]:
trip = trip.withColumnRenamed('lon','trip_long')
trip = trip.withColumnRenamed('lat','trip_lat')

weather = weather.withColumnRenamed('lon','weather_long')
weather = weather.withColumnRenamed('lat','weather_lat')

In [26]:
trip2 = trip.select('vehicle_id','pst_timestamp','date','time','trip_lat','trip_long','encoded_ll')
trip2 = trip2.join(weather, on=['date','time','encoded_ll'], how='left') #.orderBy('date','time','encoded_ll')
trip2 = trip2.where('x is not null')
trip2 = trip2.withColumn('vehicle_id', F.col('vehicle_id').cast('string'))

In [30]:
def map_weather(mx):
    
    if mx < 27:
        return 'SNOW'
    elif mx >= 27 and mx < 32:
        return 'FREEZING RAIN'
    else:
        return 'RAIN'

udf3 = F.udf(lambda x: map_weather(x))
trip2 = trip2.withColumn('weather_label', udf3('temperature_data'))

In [31]:
#weather perception
def map_per(mx):
    
    if mx < 2.5:
        return 'LIGHT'
    elif mx >= 2.5 and mx < 7.6:
        return 'MODERATE'
    else:
        return 'HEAVY'

udf4 = F.udf(lambda x: map_per(x))
trip2 = trip2.withColumn('perception_label', udf4('precipitation_data'))

In [32]:
schema = StructType([
    StructField("vehicle_id", StringType()),
    StructField("pst_timestamp", TimestampType()),
    StructField("total_light_rain_driving_km", IntegerType()),
    StructField("total_light_freezing_rain_driving_km", IntegerType()),
    StructField("total_light_snow_driving_km", IntegerType()),
    StructField("total_moderate_rain_driving_km", IntegerType()),
    StructField("total_moderate_freezing_rain_driving_km", IntegerType()),
    StructField("total_moderate_snow_driving_km", IntegerType()),
    StructField("total_heavy_rain_driving_km", IntegerType()),    
])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute(pdf):
    
    def compute_haversine(combs):
        val = 0
        prev = combs[0]
        
        for i in combs[1:]:
            g = haversine(i, prev, unit='km')
            val += abs(g)
            prev = i
        
        return val
        
    
    def inner_compute(pdf):
        
        # create data fraames
        ft1 = pdf[(pdf['perception_label'] == 'LIGHT') & (pdf['weather_label'] == 'RAIN')]
        ft2 = pdf[(pdf['perception_label'] == 'LIGHT') & (pdf['weather_label'] == 'FREEZING RAIN')]
        ft3 = pdf[(pdf['perception_label'] == 'LIGHT') & (pdf['weather_label'] == 'SNOW')]
        ft4 = pdf[(pdf['perception_label'] == 'MODERATE') & (pdf['weather_label'] == 'RAIN')]
        ft5 = pdf[(pdf['perception_label'] == 'MODERATE') & (pdf['weather_label'] == 'FREEZING RAIN')]
        ft6 = pdf[(pdf['perception_label'] == 'MODERATE') & (pdf['weather_label'] == 'SNOW')]
        ft7 = pdf[(pdf['perception_label'] == 'HEAVY') & (pdf['weather_label'] == 'RAIN')]
        
        if ft1.shape[0] != 0:
            combs1 = ft1[['trip_lat','trip_long']].apply(tuple, axis=1).tolist()
            # print(combs[:4])
            dist_light_rain = compute_haversine(combs1)
        else:
            dist_light_rain = 0
        
        if ft2.shape[0] != 0:
            combs2 = ft2[['trip_lat','trip_long']].apply(tuple, axis=1).tolist()
            # print(combs[:4])
            dist_light_freezing_rain = compute_haversine(combs2)
        else:
            dist_light_freezing_rain = 0  
        
        
        if ft3.shape[0] != 0:
            combs3 = ft3[['trip_lat','trip_long']].apply(tuple, axis=1).tolist()
            # print(combs[:4])
            dist_light_snow = compute_haversine(combs3)
 
        else:
            dist_light_snow = 0  
        
        if ft4.shape[0] != 0:
            combs4 = ft4[['trip_lat','trip_long']].apply(tuple, axis=1).tolist()
            # print(combs[:4])
            dist_rain_snow = compute_haversine(combs4)
 
        else:
            dist_rain_snow = 0
        
        if ft5.shape[0] != 0:
            combs5 = ft5[['trip_lat','trip_long']].apply(tuple, axis=1).tolist()
            # print(combs[:4])
            dist_freezing_rain_snow = compute_haversine(combs5)
 
        else:
            dist_freezing_rain_snow = 0
        
        if ft6.shape[0] != 0:
            combs6 = ft6[['trip_lat','trip_long']].apply(tuple, axis=1).tolist()
            # print(combs[:4])
            dist_moderate_rain_snow = compute_haversine(combs6)
 
        else:
            dist_moderate_rain_snow = 0
            
        
        if ft7.shape[0] != 0:
            combs7 = ft7[['trip_lat','trip_long']].apply(tuple, axis=1).tolist()
            # print(combs[:4])
            dist_heavy_rain_driving = compute_haversine(combs7)
 
        else:
            dist_heavy_rain_driving = 0
    
        
        return pd.Series({'total_light_rain_driving_km' : round(dist_light_rain),
                          'total_light_freezing_rain_driving_km' : round(dist_light_freezing_rain),
                           'total_light_snow_driving_km': round(dist_light_snow),
                           'total_moderate_rain_driving_km': round(dist_rain_snow) ,
                           'total_moderate_freezing_rain_driving_km': round(dist_freezing_rain_snow),
                           'total_moderate_snow_driving_km': round(dist_moderate_rain_snow),
                           'total_heavy_rain_driving_km': round(dist_heavy_rain_driving)})
    
    df = pdf.groupby(['vehicle_id', pd.Grouper(key='pst_timestamp', freq='W-MON')]).apply(inner_compute).reset_index()
    print(df.columns)
    print(df.head())
    # df.columns = ['vehicle_id','Week_start_date','total_light_rain_driving_km']
    return df
    

In [34]:
trip3 = trip2.groupby('vehicle_id').apply(compute)

trip3 = trip3.withColumnRenamed('pst_timestamp', 'week_start_date')

trip3 = trip3.withColumn('week_start_date', F.date_format('week_start_date', 'YYYY-MM-DD'))

trip3 = trip3.orderBy('vehicle_id','week_start_date')

In [45]:
trip3.repartition(1).write.csv('./weather_features', header=True)

In [44]:
! rm -fr weather_features/