##ACRTA road taxation data engineering

Astro City Road transport authority (ACRTA) in US have come up with an idea to use car registration renewal charges to provide indirect incentives to safe drivers. Also, providing subsidies to certain areas as per the extreme climatic conditions in terms of heavy snow or rain.

ACRTA has contacted us to perform a quantitative study and design a prediction model to support the aforementioned applications.

We, being a part of the data engineering team, are working continuously with the business stakeholders as well as data scientists to create features around these scenarios.

Problem statement that we have been provided is to “Develop inputs for a model that predicts the chances of having a vehicle accident based on driving conditions. This model will help the transport authority to understand risk patterns and act upon them.”

This output then would be utilized so as to come up for a risk-based taxation on different drivers and locations as per crash-prone weather conditions.

Use cases would be –
1. Imposing “unsafe driving tax” on drivers to provide a positive feedback loop which may be revisited every year by looking at the past year trip data based on the driving patterns.
2. Lower the tax in the regions where the climatic conditions lead them to become a crash-prone site

##Data Description
1. Drive Data (Connected car data) – Data coming from the car-mounted devices, which provides you with the car statistics every second. This information will include – Speed, acceleration, engine temperature and other car statistics.
2. Trip – Parameters associated with location of car such as lattitude, longitude, altitude and other similar parameters.
3. Weather – Weather condition at different latitude & longitude during different times each day.
4. Vehicle Specifications – Different vehicle technical specifications which comes from the manufacturer of the car.

In [3]:
# File location and type
file_location_drive = "/FileStore/tables/drive/"
file_location_weather = "/FileStore/tables/weather/"
file_location_trip = "/FileStore/tables/trip/"
file_location_vehicle = "/FileStore/tables/vehicle/"
file_type = "parquet"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_drive = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location_drive)

df_weather = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location_weather)

df_trip = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location_trip)

df_vehicle = spark.read.format("csv") \
  .option("inferSchema", infer_schema) \
  .option("header", "true") \
  .option("sep", delimiter) \
  .load(file_location_vehicle)



In [4]:
# Create a view or table
temp_table_drive = "drive_temp"
temp_table_weather = "weather_temp"
temp_table_trip = "trip_temp"
temp_table_vehicle = "vehicle_temp"

df_drive.createOrReplaceTempView(temp_table_drive)
df_weather.createOrReplaceTempView(temp_table_weather)
df_trip.createOrReplaceTempView(temp_table_trip)
df_vehicle.createOrReplaceTempView(temp_table_vehicle)

In [5]:

# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.

permanent_table_drive = "drive_tb"
permanent_table_weather = "weather_tb"
permanent_table_trip = "trip_tb"
permanent_table_vehicle = "vehicle_tb"


df_drive.write.format("delta").partitionBy("vehicle_id","trip_id").saveAsTable(permanent_table_drive)
df_weather.write.format("delta").saveAsTable(permanent_table_weather)
df_trip.write.format("delta").partitionBy("vehicle_id").saveAsTable(permanent_table_trip)
df_vehicle.write.format("delta").partitionBy("vehicle_id").saveAsTable(permanent_table_vehicle)

In [6]:
%sql
--Optimise delta files
OPTIMIZE `weather_tb`;
OPTIMIZE `drive_tb`;
OPTIMIZE `trip_tb`;
OPTIMIZE `vehicle_tb`;


path
""


Geocode function to convert latitude an dlongitude  of a loctaion to geocode to precision 5.

In [8]:
def geocode(lat,lon,p):
  import Geohash
  return Geohash.encode(lat, lon, p)
spark.udf.register("geocodeWithPython", geocode)

Haversine function to calculate distnce between two places in kms.

In [10]:
def haversine(geocode_1,geocode_2):
  from haversine import haversine, Unit
  import Geohash
  return haversine(Geohash.decode(geocode_1), Geohash.decode(geocode_2), 'km')
spark.udf.register("haversineWithPython", haversine)

Data merged from drive,vehicle,weather and trip tables for further analysis

In [12]:
%sql
CREATE TABLE merge_tb2
COMMENT 'This table is created with existing data'
  AS 
 select 
  cast(d.vehicle_id as int),
  cast(v.year as int) as vehicle_year,
  v.make,
  v.model,
  cast(v.drivetrain as int),
  cast(v.max_torque as int),
  cast(v.max_horsepower as int),
  cast(v.max_horsepower_rpm as int),
  cast(v.max_torque_rpm as int),
  cast(v.engine_displacement as int),
  cast(v.fuel_type as int),
  cast(v.fuel_tank_capacity as int),
  cast(v.fuel_economy_city as int),
  cast(v.fuel_economy_highway as int),
  cast(v.cylinders as int),
  cast(v.forced_induction as int),
  cast(v.device_generation as int),
  to_timestamp(t.datetime,"PST") as trip_datetime,
  date_format(to_timestamp(t.datetime,"PST"),'E') as trip_day,
  date_sub(cast(to_timestamp(t.datetime,"PST") as date),dayofweek(cast(to_timestamp(t.datetime,"PST") as date))-2) as week_start_date,
  cast(t.lat as float) as trip_latitude,
  cast(t.long as float) as trip_longitude,
  --geocodeWithPython(t.lat,t.long,5) as trip_geocode,
  cast(t.velocity as int) as trip_velocity_kmh,
  cast(t.velocity*5/18 as float) as trip_velocity_ms,
  t.trip_id,
  d.accel_x,
  d.accel_y,
  d.accel_z,
  cast(d.engine_coolant_temp as int),
  cast(d.eng_load as int),
  cast(d.fuel_level as int),
  cast(d.iat as int),
  cast(rpm as int),
  cast(d.rpm/60 as float) as rps,
  cast(d.velocity as int) as drive_velocity,
  ((d.eng_load/ 255)*v.max_torque*(d.RPM/5252)) as active_horsepower,
  (((d.eng_load/ 255)*v.max_torque*(d.RPM/5252))/v.Max_Horsepower) as horsepower_utilization,
   (d.Eng_load/255) as torque_utilization,
   (d.RPM /v.max_horsepower_rpm) as rpm_utilization,
   w.x,
   w.y,
   cast(w.date as date) as weather_date,
   cast(substr(w.time,1,2) as tinyint) as weather_time,
   cast(w.lat as float) as weather_lat,
   cast(w.lon as float) as weather_lon,
   --geocodeWithPython(w.lat,w.lon,5) as weather_geocode,
   cast(w.temperature_data as float)as w_temp,
   w.temperature_unit as weather_temp_unit,
   (temperature_data*9/5 - 459.67) as weather_temp,
   cast(w.precipitation_data as float) as weather_precipitation,
   w.precipitation_unit as weather_precipitation_unit,
   cast(w.wind_ew_data as float),
   w.wind_ew_unit,
   cast(w.wind_ns_data as float),
   w.wind_ns_unit
from  drive_tb as d 
full join trip_tb as t
on d.vehicle_id = t.vehicle_id
and d.trip_id = t.trip_id
and d.datetime = t.datetime
full join vehicle_tb as v
on d.vehicle_id = v.vehicle_id
left join weather_tb as w
on geocodeWithPython(w.lat,w.lon,5) = geocodeWithPython(t.lat,t.long,5)
and cast(w.date as date) = cast(to_timestamp(t.datetime,"PST") as date)
and cast(substr(w.time,1,2) as tinyint) = cast(hour(to_timestamp(t.datetime,"PST")) as tinyint)




In [13]:
%sql
select * from merge_tb2 limit 10;

vehicle_id,vehicle_year,make,model,drivetrain,max_torque,max_horsepower,max_horsepower_rpm,max_torque_rpm,engine_displacement,fuel_type,fuel_tank_capacity,fuel_economy_city,fuel_economy_highway,cylinders,forced_induction,device_generation,trip_datetime,trip_day,week_start_date,trip_latitude,trip_longitude,trip_velocity_kmh,trip_velocity_ms,trip_id,accel_x,accel_y,accel_z,engine_coolant_temp,eng_load,fuel_level,iat,rpm,rps,drive_velocity,active_horsepower,horsepower_utilization,torque_utilization,rpm_utilization,x,y,weather_date,weather_time,weather_lat,weather_lon,w_temp,weather_temp_unit,weather_temp,weather_precipitation,weather_precipitation_unit,wind_ew_data,wind_ew_unit,wind_ns_data,wind_ns_unit
1000508,,,,,,,,,,,,,,,,,2017-01-01T13:27:19.000+0000,Sun,2017-01-02,30.8125,-113.98222,82,22.966667,2ffe7b89bb2a428ba1cf45e70e0d7bc0,39.0,91.49,40.0,112,202,129,126,1933,32.227,82,,,0.7956470588235294,,,,,,,,,,,,,,,,
1000508,,,,,,,,,,,,,,,,,2017-01-01T13:27:34.000+0000,Sun,2017-01-02,30.8125,-113.97806,67,18.833334,2ffe7b89bb2a428ba1cf45e70e0d7bc0,40.0,87.04,48.0,101,202,117,121,1925,32.09,67,,,0.7942745098039216,,,,,,,,,,,,,,,,
1000508,,,,,,,,,,,,,,,,,2017-01-01T13:28:27.000+0000,Sun,2017-01-02,30.8125,-113.96333,72,20.055555,2ffe7b89bb2a428ba1cf45e70e0d7bc0,36.0,86.77,50.0,98,201,116,129,1924,32.078667,72,,,0.7894509803921569,,,,,,,,,,,,,,,,
1000508,,,,,,,,,,,,,,,,,2017-01-01T13:28:39.000+0000,Sun,2017-01-02,30.8125,-113.96,53,14.827778,2ffe7b89bb2a428ba1cf45e70e0d7bc0,38.0,91.57,45.0,91,208,120,125,1926,32.105167,53,,,0.8158823529411765,,,,,,,,,,,,,,,,
1000508,,,,,,,,,,,,,,,,,2017-01-01T13:28:28.000+0000,Sun,2017-01-02,30.8125,-113.96306,36,10.194445,2ffe7b89bb2a428ba1cf45e70e0d7bc0,46.0,85.22,45.0,93,204,117,134,1926,32.116165,36,,,0.8025882352941176,,,,,,,,,,,,,,,,
1000508,,,,,,,,,,,,,,,,,2017-01-01T13:27:37.000+0000,Sun,2017-01-02,30.8125,-113.97722,65,18.069445,2ffe7b89bb2a428ba1cf45e70e0d7bc0,44.0,86.75,53.0,105,210,117,136,1923,32.0545,65,,,0.8261960784313725,,,,,,,,,,,,,,,,
1000508,,,,,,,,,,,,,,,,,2017-01-01T13:28:50.000+0000,Sun,2017-01-02,30.8125,-113.95695,71,19.777779,2ffe7b89bb2a428ba1cf45e70e0d7bc0,47.0,83.19,55.0,93,200,125,125,1934,32.246166,71,,,0.7864313725490196,,,,,,,,,,,,,,,,
1000508,,,,,,,,,,,,,,,,,2017-01-01T13:27:00.000+0000,Sun,2017-01-02,30.8125,-113.9875,73,20.527779,2ffe7b89bb2a428ba1cf45e70e0d7bc0,46.0,93.31,50.0,105,201,127,128,1923,32.066166,73,,,0.7910588235294118,,,,,,,,,,,,,,,,
1000508,,,,,,,,,,,,,,,,,2017-01-01T13:27:10.000+0000,Sun,2017-01-02,30.8125,-113.984726,91,25.313889,2ffe7b89bb2a428ba1cf45e70e0d7bc0,42.0,77.3,52.0,101,206,109,126,1935,32.264168,91,,,0.809843137254902,,,,,,,,,,,,,,,,
1000508,,,,,,,,,,,,,,,,,2017-01-01T13:26:54.000+0000,Sun,2017-01-02,30.8125,-113.98917,79,22.041666,2ffe7b89bb2a428ba1cf45e70e0d7bc0,50.0,84.89,56.0,100,198,123,127,1918,31.980167,79,,,0.7776862745098039,,,,,,,,,,,,,,,,


1. Engine Features (file name – engine_features.csv)-
Grain - every vehicle aggregated at week start date(Monday) for the complete week in YYYY-MM-DD format.
Sorted - by Vehicle ID and week_start_Date in ascending manner

Hints:

1. Convert timezone to PST before any calculations
2. All vehicles from drive data should be in the final output even if you do not have specifications (Fill with 0 if specs are not given)
3. Active horsepower - Engine load / 255 * Max Torque * RPM / 5252
4. Horsepower utilization – Active horsepower / Max Horsepower
5. Torque Utilization - calculated as Engine load/ 255
6. RPM Utilization – RPM / Maximum horsepower rpm

In [15]:
%sql
select vehicle_id,
week_start_date,
sum(ft_torque_util_60pct_s) as ft_torque_util_60pct_s,
sum(ft_torque_util_70pct_s) as ft_torque_util_70pct_s,
sum(ft_torque_util_80pct_s) as ft_torque_util_80pct_s,
sum(ft_torque_util_90pct_s) as ft_torque_util_90pct_s,
sum(ft_horsepower_util_50pct_s) as ft_horsepower_util_50pct_s,
sum(ft_horsepower_util_60pct_s) as ft_horsepower_util_60pct_s,
sum(ft_horsepower_util_70pct_s) as ft_horsepower_util_70pct_s,
sum(ft_horsepower_util_80pct_s) as ft_horsepower_util_80pct_s,
sum(ft_rpm_util_50pct_s) as ft_rpm_util_50pct_s,
sum(ft_rpm_util_60pct_s) as ft_rpm_util_60pct_s
from
(select distinct vehicle_id,week_start_date,
case when torque_utilization >= 0.6 and torque_utilization < 0.7 then rps else 0
 end as ft_torque_util_60pct_s,
 case when torque_utilization >= 0.7 and torque_utilization < 0.8 then rps else 0
 end as ft_torque_util_70pct_s,
 case when torque_utilization >= 0.8 and torque_utilization < 0.9 then rps else 0
 end as ft_torque_util_80pct_s,
 case when torque_utilization >= 0.9 and torque_utilization < 1 then rps else 0
 end as ft_torque_util_90pct_s,
 case when horsepower_utilization >= 0.5 and horsepower_utilization < 0.6 then rps else 0
 end as ft_horsepower_util_50pct_s,
 case when horsepower_utilization >= 0.6 and horsepower_utilization < 0.7 then rps else 0
 end as ft_horsepower_util_60pct_s,
 case when horsepower_utilization >= 0.7 and horsepower_utilization < 0.8 then rps else 0
 end as ft_horsepower_util_70pct_s,
 case when horsepower_utilization >= 0.8 and horsepower_utilization < 0.9 then rps else 0
 end as ft_horsepower_util_80pct_s,
 case when rpm_utilization >= 0.5 and rpm_utilization < 0.6 then rps else 0
 end as ft_rpm_util_50pct_s,
 case when rpm_utilization >= 0.6 and rpm_utilization < 0.7 then rps else 0
 end as ft_rpm_util_60pct_s
from merge_tb2)
group by  vehicle_id,week_start_date
order by 1,2;

vehicle_id,week_start_date,ft_torque_util_60pct_s,ft_torque_util_70pct_s,ft_torque_util_80pct_s,ft_torque_util_90pct_s,ft_horsepower_util_50pct_s,ft_horsepower_util_60pct_s,ft_horsepower_util_70pct_s,ft_horsepower_util_80pct_s,ft_rpm_util_50pct_s,ft_rpm_util_60pct_s
1000500,2017-01-02,7446.522682189941,799788.8849372864,717890.3173313141,2702.498672485352,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-01-09,603.0646667480469,208265.81711769104,245259.2092075348,4138.552322387695,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-01-16,2183.144504547119,424270.2265682221,203373.59084892276,4034.7151527404794,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-01-23,0.0,246330.8596343994,160809.58015823364,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-01-30,0.0,80128.05196762085,74484.8286743164,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-02-06,34.91466522216797,66242.09112167358,7289.688175201416,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-02-13,1831.4396667480469,390959.8028488159,265891.13804244995,335.227165222168,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-02-20,35.365501403808594,81043.23531150818,314520.29602622986,5617.92448425293,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-02-27,143.74833297729492,115963.53032302856,54058.48637390137,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1000501,2017-01-02,7236.042531967163,747691.9312934875,799271.3203849792,7586.204982757568,362879.0493793488,941231.429069519,257675.02074432373,0.0,1561785.4991931915,0.0


In [16]:
%sql
/*select vehicle_id,
week_start_date,
sum(ft_torque_util_60pct_s) as ft_torque_util_60pct_s,
sum(ft_torque_util_70pct_s) as ft_torque_util_70pct_s,
sum(ft_torque_util_80pct_s) as ft_torque_util_80pct_s,
sum(ft_torque_util_90pct_s) as ft_torque_util_90pct_s,
sum(ft_horsepower_util_50pct_s) as ft_horsepower_util_50pct_s,
sum(ft_horsepower_util_60pct_s) as ft_horsepower_util_60pct_s,
sum(ft_horsepower_util_70pct_s) as ft_horsepower_util_70pct_s,
sum(ft_horsepower_util_80pct_s) as ft_horsepower_util_80pct_s,
sum(ft_rpm_util_50pct_s) as ft_rpm_util_50pct_s,
sum(ft_rpm_util_60pct_s) as ft_rpm_util_60pct_s
from
(select distinct vehicle_id,
--date_sub(cast(trip_datetime as date),dayofweek(cast(trip_datetime as date))-2) as week_start_date,
case when torque_utilization >= 0.6 and torque_utilization < 0.7 then rpf over (partition by vehicle_id,date_sub(cast(trip_datetime as date),dayofweek(cast(trip_datetime as date))-2)) else 0
 end as ft_torque_util_60pct_s,
 case when torque_utilization >= 0.7 and torque_utilization < 0.8 then sum(rpm/60) over (partition by vehicle_id,date_sub(cast(trip_datetime as date),dayofweek(cast(trip_datetime as date))-2)) else 0
 end as ft_torque_util_70pct_s,
 case when torque_utilization >= 0.8 and torque_utilization < 0.9 then sum(rpm/60) over (partition by vehicle_id,date_sub(cast(trip_datetime as date),dayofweek(cast(trip_datetime as date))-2)) else 0
 end as ft_torque_util_80pct_s,
 case when torque_utilization >= 0.9 and torque_utilization < 1 then sum(rpm/60) over (partition by vehicle_id,date_sub(cast(trip_datetime as date),dayofweek(cast(trip_datetime as date))-2)) else 0
 end as ft_torque_util_90pct_s,
 case when horsepower_utilization >= 0.5 and horsepower_utilization < 0.6 then sum(rpm/60) over (partition by vehicle_id,date_sub(cast(trip_datetime as date),dayofweek(cast(trip_datetime as date))-2)) else 0
 end as ft_horsepower_util_50pct_s,
 case when horsepower_utilization >= 0.6 and horsepower_utilization < 0.7 then sum(rpm/60) over (partition by vehicle_id,date_sub(cast(trip_datetime as date),dayofweek(cast(trip_datetime as date))-2)) else 0
 end as ft_horsepower_util_60pct_s,
 case when horsepower_utilization >= 0.7 and horsepower_utilization < 0.8 then sum(rpm/60) over (partition by vehicle_id,date_sub(cast(trip_datetime as date),dayofweek(cast(trip_datetime as date))-2)) else 0
 end as ft_horsepower_util_70pct_s,
 case when horsepower_utilization >= 0.8 and horsepower_utilization < 0.9 then sum(rpm/60) over (partition by vehicle_id,date_sub(cast(trip_datetime as date),dayofweek(cast(trip_datetime as date))-2)) else 0
 end as ft_horsepower_util_80pct_s,
 case when rpm_utilization >= 0.5 and rpm_utilization < 0.6 then sum(rpm/60) over (partition by vehicle_id,date_sub(cast(trip_datetime as date),dayofweek(cast(trip_datetime as date))-2)) else 0
 end as ft_rpm_util_50pct_s,
 case when rpm_utilization >= 0.6 and rpm_utilization < 0.7 then sum(rpm/60) over (partition by vehicle_id,date_sub(cast(trip_datetime as date),dayofweek(cast(trip_datetime as date))-2)) else 0
 end as ft_rpm_util_60pct_s
from merge_tb) 
group by  vehicle_id,week_start_date
order by 1,2*/

vehicle_id,week_start_date,ft_torque_util_60pct_s,ft_torque_util_70pct_s,ft_torque_util_80pct_s,ft_torque_util_90pct_s,ft_horsepower_util_50pct_s,ft_horsepower_util_60pct_s,ft_horsepower_util_70pct_s,ft_horsepower_util_80pct_s,ft_rpm_util_50pct_s,ft_rpm_util_60pct_s
1000500,2017-01-02,6460024.216666334,6460024.216666334,6460024.216666334,6460024.216666334,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-01-09,1231186.666666663,1231186.666666663,1231186.666666663,1231186.666666663,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-01-16,2049032.9500000093,2049032.9500000093,2049032.9500000093,2049032.9500000093,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-01-23,0.0,876196.2000000014,876196.2000000014,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-01-30,0.0,410097.5666666637,410097.5666666637,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-02-06,189570.7333333332,189570.7333333332,189570.7333333332,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-02-13,2745073.200000029,2745073.200000029,2745073.200000029,2745073.200000029,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-02-20,1509533.583333352,1509533.583333352,1509533.583333352,1509533.583333352,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-02-27,490820.3500000068,490820.3500000068,490820.3500000068,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1000501,2017-01-02,5858319.600000277,11716639.200000554,17574958.80000083,11716639.200000554,17574958.80000083,17574958.80000083,11716639.200000554,0.0,46866556.80000222,0.0


2. Drive features(file name – drive_features.csv) -
Grain – Every trip’s aggregated features at a trip id level.

Sorted - by trip_id in ascending manner

Hints:

1. Acceleration m/s is calculated as a change in velocity over time
2. If a vehicle keeps on accelerating continuously over a period of time, please treat them as a single acceleration or deacceleration period.

In [19]:
%sql
select trip_id,
sum(deceleration_cnt) as ft_cnt_vehicle_deaccel_val,
sum(hard_brake_10) as ft_sum_hard_brakes_10_flg_val,
sum(hard_brake_3) as ft_sum_hard_brakes_3_flg_val,
sum(dec_sec) as ft_sum_time_deaccel_val,
sum(acceleration_cnt) as ft_cnt_vehicle_accel_val,
sum(hard_accel_10) as ft_sum_hard_accel_10_flg_val,
sum(hard_accel_3) as ft_sum_hard_accel_3_flg_val,
sum(acc_sec) as ft_sum_time_accel_val
from
(select
trip_id,
case when (is_deceleration - coalesce(lead(is_deceleration) over (partition by trip_id order by trip_datetime),0)) >= 0 then 0 
    else 1 
end as deceleration_cnt,
case when velocity_change <= -10 then 1 else 0 end as hard_brake_10,
case when velocity_change <= -3 and velocity_change > -10 then 1 else 0 end as hard_brake_3,
case when is_deceleration = 1 then unix_timestamp(lead(trip_datetime) over (partition by trip_id order by trip_datetime)) - unix_timestamp(trip_datetime) else 0 end as dec_sec,
case when (is_deceleration - coalesce(lead(is_deceleration) over (partition by trip_id order by trip_datetime),0)) >= 0 then 1 
    else 0 
end as acceleration_cnt,
case when velocity_change >= 10 then 1 else 0 end as hard_accel_10,
case when velocity_change >= 3 and velocity_change < 10 then 1 else 0 end as hard_accel_3,
case when is_deceleration = 0 then unix_timestamp(lead(trip_datetime) over (partition by trip_id order by trip_datetime)) - unix_timestamp(trip_datetime) else 0 end as acc_sec
from
(select trip_id,
trip_datetime,
trip_velocity_ms,
coalesce(lag(trip_velocity_ms) OVER (PARTITION BY trip_id order by trip_datetime),0) as lag_velocity,
(trip_velocity_ms) - coalesce(lag(trip_velocity_ms) OVER (PARTITION BY trip_id order by trip_datetime),0) as velocity_change,
case when trip_velocity_ms - coalesce(lag(trip_velocity_ms) OVER (PARTITION BY trip_id order by trip_datetime),0) < 0 then 1 else 0 end as  is_deceleration
from merge_tb2
))
group by trip_id
order by 1




trip_id,ft_cnt_vehicle_deaccel_val,ft_sum_hard_brakes_10_flg_val,ft_sum_hard_brakes_3_flg_val,ft_sum_time_deaccel_val,ft_cnt_vehicle_accel_val,ft_sum_hard_accel_10_flg_val,ft_sum_hard_accel_3_flg_val,ft_sum_time_accel_val
00922df3be5a4589ab385d0c2da2dd81,976,15,563,1413,1923,8,541,1485
00dc31fe55e24d14989c89de4b3b683b,1217,52,939,1852,2473,82,892,1837
0156d21e316d4d8b9d5bf6ccff797bf7,1134,1,219,1745,2300,1,230,1688
01b8a24510cd4e4684d67b96369286e0,210,12,154,311,421,10,160,319
01c2a70c25e5428bb33811ca5eb19270,2375,1,695,3517,4761,1,687,3618
01d4a4efe7d14d11a5f9fa5f40bd8bc3,1029,2,526,1579,2082,5,567,1531
0244f7bc747b41fba6fcd75444736621,185,1,120,277,363,3,122,270
025e2f194863461d9c27b2242007ac09,1961,200,1541,2952,3918,214,1502,2926
02abe724b7a943cdae50c69e4f03cc26,3186,52,2047,4781,6362,52,2028,4766
02c51e56cc484711b218d3d01196687a,1976,5,1026,2952,3931,8,986,2954


Weather features (file name – weather_features.csv)

grain – Every vehicle detail should be aggregated at a week start date.
Sorted - by vehicle_id and week_start_date in ascending manner

 

Hint: convert time zone to PST before any calculations

Assumptions & Hints–

1. Weather data is already in PST and may not need any timezone conversion. You may consider the weather data to be constant for complete hour basis. For example- if the temperature is given to be 284.51 for 2017-02-14 19:00:00, it would be the same for time 2017-02-14 19:15:45 as well.
2. Haversine formula must be utilized to measure the distance between any 2 consecutive points in between the trips.
3. Matches in between datasets must be on geohash precision point 5.

In [21]:
%sql
select
vehicle_id,
week_start_date,
sum(total_light_rain_driving_km) as total_light_rain_driving_km,
sum(total_light_freezing_rain_driving_km) as total_light_freezing_rain_driving_km,
sum(total_light_snow_driving_km) as total_light_snow_driving_km,
sum(total_moderate_rain_driving_km) as total_moderate_rain_driving_km,
sum(total_moderate_freezing_rain_driving_km) as total_moderate_freezing_rain_driving_km,
sum(total_moderate_snow_driving_km) as total_moderate_snow_driving_km,
sum(total_heavy_rain_driving_km) as total_heavy_rain_driving_km
from
(select 
vehicle_id,
week_start_date,
case when prec_temp = 'light rain' then haversine_distance else 0 end as  total_light_rain_driving_km,
case when prec_temp = 'light freezing rain' then haversine_distance else 0 end as  total_light_freezing_rain_driving_km,
case when prec_temp = 'light snow' then haversine_distance else 0 end as  total_light_snow_driving_km,
case when prec_temp = 'moderate rain' then haversine_distance else 0 end as  total_moderate_rain_driving_km,
case when prec_temp = 'moderate freezing rain' then haversine_distance else 0 end as  total_moderate_freezing_rain_driving_km,
case when prec_temp = 'moderate snow' then haversine_distance else 0 end as  total_moderate_snow_driving_km,
case when prec_temp = 'heavy rain' then haversine_distance else 0 end as  total_heavy_rain_driving_km
from
(select distinct
vehicle_id,
week_start_date,
trip_id,
haversineWithPython(geocodeWithPython(trip_latitude,trip_longitude,5),coalesce(lead(geocodeWithPython(trip_latitude,trip_longitude,5)) OVER (PARTITION BY trip_id order by trip_datetime),geocodeWithPython(trip_latitude,trip_longitude,5))) as haversine_distance,
case when weather_precipitation >= 0 and weather_precipitation <= 2.5 then 
      case when weather_temp <= 27 then 'light snow'
      when weather_temp > 27 and weather_temp <= 32 then 'light freezing rain'
      when weather_temp > 32 then 'light rain' end
  when weather_precipitation > 2.5 and weather_precipitation <= 7.6 then 
      case when weather_temp <= 27 then 'moderate snow'
      when weather_temp > 27 and weather_temp <= 32 then 'moderate freezing rain'
      when weather_temp > 32 then 'moderate rain' end
  when weather_precipitation > 7.6 then 
      case when weather_temp <= 27 then 'heavy snow'
      when weather_temp > 27 and weather_temp <= 32 then 'heavy freezing rain'
      when weather_temp > 32 then 'heavy rain' end
  end as prec_temp
from merge_tb2))
group by vehicle_id,week_start_date
order by 1,2



vehicle_id,week_start_date,total_light_rain_driving_km,total_light_freezing_rain_driving_km,total_light_snow_driving_km,total_moderate_rain_driving_km,total_moderate_freezing_rain_driving_km,total_moderate_snow_driving_km,total_heavy_rain_driving_km
1000500,2017-01-02,96.71087712920811,164.6319251766542,62.06592279429491,70.91633135150117,110.21905091586324,4.141360979397352,34.81520411135439
1000500,2017-01-09,0.0,37.62552114983336,44.45569641710391,0.0,0.0,0.0,0.0
1000500,2017-01-16,70.68274994544375,17.06319864336898,0.0,32.29638272401055,8.505171230706276,0.0,18.325790928704656
1000500,2017-01-23,33.564216591958115,16.77312455080447,0.0,24.43251274662704,0.0,0.0,0.0
1000500,2017-01-30,13.32444640254036,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-02-06,8.415163353274881,0.0,0.0,0.0,0.0,0.0,0.0
1000500,2017-02-13,40.51154078931313,0.0,0.0,63.441812638115145,4.886502549324983,0.0,36.254718251341394
1000500,2017-02-20,33.693678225460985,4.180658318095278,0.0,22.371922539774296,0.0,0.0,37.15505344660702
1000500,2017-02-27,16.784364844525765,0.0,0.0,0.0,0.0,0.0,0.0
1000501,2017-01-02,94.52581478190956,143.28034205341666,42.51991259699429,36.97767608234134,89.44274684120528,18.083455401451182,12.662577414051446


In [22]:
import spark_df_profiling
profile = spark_df_profiling.ProfileReport(sqlContext.sql("SELECT * from merge_tb2"))
profile.to_file(outputfile="/tmp/profilingReport.html")