In [1]:
import h3
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import col, lit, udf, acos, cos, sin, radians, rank, element_at
from pyspark.sql.types import StringType, FloatType, BooleanType, ArrayType, DoubleType
import pandas as pd

In [2]:
import os
import data_preparation_faster
from data_preparation_faster import prepare_triplogs, prepare_tripdatas, get_clean_triplogs, get_clean_tripdatas

In [3]:
data_dir = "../2020_11_8_to_2020_11_14"

In [4]:
!ls ../2020_11_8_to_2020_11_14/CMS

intersectionstatusreport.csv
intersectionstatusreport_data_dictionary.docx
opticomdevicelog.csv
opticomdevicelog_data_dictonary.docx


In [5]:
tripdatas_df = pd.read_csv(os.path.join(data_dir, "CVP/tripdatas.csv"))

In [6]:
triplogs_df = pd.read_csv(os.path.join(data_dir, "CVP/triplogs.csv"))

In [7]:
triplogs_df = prepare_triplogs(triplogs_df)
tripdatas_df = prepare_tripdatas(tripdatas_df)

# takes about 4 minutes to run

tripdatas_df = get_clean_tripdatas(tripdatas_df) # cleaning tripdatas first is important
tripdatas_df_good = tripdatas_df[tripdatas_df["is_good_for_study"]]
triplogs_df = get_clean_triplogs(triplogs_df, tripdatas_df_good)
triplogs_df_good = triplogs_df[triplogs_df["is_good_for_study"]]

drop invalid
drop starttime >= endtime
drop endstatus not completed
drop normal tspmode
drop negative duration
drop <=70% stops hit
drop <=70% breadcrumbs
drop duplicated rows
drop tspon no tsp requests
drop 3 stds anomalies


In [8]:
intersection_status_report_df = pd.read_csv(os.path.join(data_dir, "CMS/intersectionstatusreport.csv"))

In [9]:
triplogs_df_good[["is_good_for_study"]].head()

Unnamed: 0,is_good_for_study
1,True
3,True
4,True
5,True
14,True


In [10]:
import os

In [11]:
def dist(lat_x, long_x, lat_y, long_y):
    return acos(
        sin(radians(lat_x)) * sin(radians(lat_y)) +
        cos(radians(lat_x)) * cos(radians(lat_y)) *
            cos(radians(long_x) - radians(long_y))
    ) * lit(6371.0 * 1000) # in meters


In [12]:
import math 

def dist_simple(lat_x, long_x, lat_y, long_y):
    # no spark used here
    return math.acos(
        math.sin(math.radians(lat_x)) * math.sin(math.radians(lat_y)) +
        math.cos(math.radians(lat_x)) * math.cos(math.radians(lat_y)) *
            math.cos(math.radians(long_x) - math.radians(long_y))
    ) * 6371.0 * 1000 # in meters


In [13]:
geo_to_h3_udf = udf(h3.geo_to_h3, StringType())

In [14]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .config('spark.driver.memory','3G') \
    .getOrCreate()

In [15]:
# old read

In [16]:
# intersection_status_report = spark.read.parquet("./source/intersection_status_report/2021-05-01")

In [17]:
# trip_logs = spark.read.parquet("./source/triplogs/05-01-2021").drop('routeName')

In [18]:
# trip_datas = spark.read.parquet("./source/tripdatas/05-01-2021")

In [19]:
####

In [20]:
# new read

In [21]:
intersection_status_report_df[['latitude', 'longitude']] = intersection_status_report_df[['latitude', 'longitude']].astype(str)

In [22]:
intersection_status_report_df.columns

Index(['deviceid', 'locationid', 'locationname', 'device',
       'applicationfirmwareversion', 'lastlogretrieved', 'latitude',
       'longitude', 'cha', 'chb', 'chc', 'chd', 'status', 'details',
       'date_ran'],
      dtype='object')

In [23]:
intersection_status_report_df = intersection_status_report_df[['status','locationid', 'locationname', 'latitude', 'longitude']]

In [24]:
intersection_status_report = spark.createDataFrame(intersection_status_report_df)

In [25]:
trip_logs = spark.createDataFrame(triplogs_df_good[['deviceid', 'direction', 'duration', 'logid', 'routename', 'tripid', 'tspmode', 'uid', 'is_good_for_study']])

In [26]:
del tripdatas_df

In [27]:
tripdatas_df_good[['deviceid',
       'event', 'loc', 'logid', 'mph', 'routename',
       'stopname', 'time', 'date_ran',
       'uid', 'is_good_for_study']].to_parquet("tripdatas_df_good_pq")

In [28]:
trip_datas = spark.read.parquet("tripdatas_df_good_pq")

In [29]:
del tripdatas_df_good
del triplogs_df
del triplogs_df_good

In [30]:
trip_datas = trip_datas \
    .withColumnRenamed('logid', 'logID') \
    .withColumnRenamed('deviceid', 'deviceID')

trip_logs = trip_logs \
    .withColumnRenamed('logid', 'logID') \
    .withColumnRenamed('deviceid', 'deviceID').drop('routename')

In [31]:
####

In [32]:
trip_log_datas = trip_logs.join(trip_datas, on=['logID', 'deviceID'], how='inner')

In [33]:
intersections = intersection_status_report \
    .filter("status <> 'Error'") \
    .withColumnRenamed('latitude', 'actual_lat') \
    .withColumnRenamed('longitude', 'actual_lon') \
    .withColumnRenamed('locationname', 'name') \
    .withColumnRenamed('locationid', 'id') \
    .withColumn('actual_lat', col('actual_lat').cast("double")) \
    .withColumn('actual_lon', col('actual_lon').cast("double")) \
    .withColumn('zone_id', geo_to_h3_udf('actual_lat', 'actual_lon', lit(9))) \
    .select('id', 'name', 'actual_lat', 'actual_lon', 'zone_id') \
    .distinct()

In [34]:
def loc_to_coords(loc):
    coords = [float(x) for x in loc[loc.index('[') + 1: loc.index(']')].split(',')]
    return coords

loc_to_coords_udf = udf(loc_to_coords, ArrayType(DoubleType()))

In [89]:
from pyspark.sql.functions import row_number

trip_log_datas = trip_log_datas \
    .withColumn('loc_coords', loc_to_coords_udf('loc'))

zoned_trip_data = trip_log_datas \
    .withColumnRenamed('tspMode','tspmode') \
    .withColumnRenamed('routeName', 'routename') \
    .withColumn('lon', element_at(col("loc_coords"), 1)) \
    .withColumn('lat', element_at(col("loc_coords"), 2)) \
    .select('lon', 'lat', 'time', 'routeName', 'direction', 'tripID', 'logID', 'deviceID', 'event', 'mph', 'tspMode') \
    .withColumn('zone_id', geo_to_h3_udf('lat', 'lon', lit(9))) \
    .withColumn('breadcrumb_index', row_number().over(Window.partitionBy(['logID', 'deviceID']).orderBy('time')))

## Detect tripdatas closest to intersection (tripdata points crossing intersection)

In [90]:
# max distance (m) from breadcrumb to intersection, bus to be considered possibly crossing this intersection
# it can be thought of as a gps sideways threshold
dist_to_be_crossing_thresh = 45

In [91]:
zoned_trip_data_intersection = zoned_trip_data.join(intersections, on="zone_id")

zoned_trip_data_intersection = zoned_trip_data_intersection \
    .withColumn("dist_to_intersection", dist("lat", "lon", "actual_lat", "actual_lon")) \
    .withColumn("rank_dist", row_number().over(Window.partitionBy(["logID", "deviceID", "id", "routeName", "direction"]).orderBy("dist_to_intersection")))

trip_datas_crossing_intersections = zoned_trip_data_intersection.filter("rank_dist == 1") \
    .filter(zoned_trip_data_intersection.dist_to_intersection < dist_to_be_crossing_thresh).orderBy("time")

In [92]:
# save trip_datas_crossing_intersections to dict to make it work faster

from collections import defaultdict
log_id_device_id_to_trip_datas_crossing_intersections = defaultdict(list)

for row in trip_datas_crossing_intersections.collect():
    log_id_device_id_to_trip_datas_crossing_intersections[str(row["logID"]) + "_" + str(row["deviceID"])].append(row)


In [93]:
# save breadcrumbs with 'stop arrive', 'stop depart' to dict to later detect breadcrumbs between those events

log_id_device_id_to_trip_datas_arrive_depart = defaultdict(list)

for row in zoned_trip_data.filter("event IN ('stop arrive', 'stop depart')").collect():
    log_id_device_id_to_trip_datas_arrive_depart[row['logID'] + "_" + row['deviceID']].append(row)

In [94]:
# sequential stop arrive stop depart indexes saved in dict
log_id_device_id_to_stoparrive_stopdepart_indexes = defaultdict(list)

for key, val in log_id_device_id_to_trip_datas_arrive_depart.items():
    stop_arrive_depart_breadcrumbs = val
    cur_stop_arrive_stopdepart_indexes = []
    for ind, row in enumerate(stop_arrive_depart_breadcrumbs):
        if ind + 1 < len(stop_arrive_depart_breadcrumbs) and (row['event'] == "stop arrive") and (stop_arrive_depart_breadcrumbs[ind + 1]['event'] == "stop depart"):
            cur_stop_arrive_stopdepart_indexes.append({'arrive': row["breadcrumb_index"], 'depart': stop_arrive_depart_breadcrumbs[ind + 1]["breadcrumb_index"]})
    
    log_id_device_id_to_stoparrive_stopdepart_indexes[key] = cur_stop_arrive_stopdepart_indexes
    

## Upcoming intersection detection

In [95]:
import json

def get_upcoming_intersection_factory(log_id_device_id_to_trip_datas_crossing_intersections_broadcasted):
    
    def get_upcoming_intersection(log_id, device_id, lat, lon, breadcrumb_index, dist_threshold):
        """
        log_id - log_id to identify trip in combination with device_id
        device_id - device_id identify trip in combination with log_id
        breadcrumb_index - breadcrumb_index attribute for given breadcrumb (generated at previous steps)
        log_id_device_id_to_trip_datas_crossing_intersections - dict where keys are strings [logID]_[deviceID]
        and values are lists of rows in tripdatas where crossing the intersection happened
        dist_threshold - max distance of bus to intersection so it is considered to be affected by that intersection

        returns: string representation of dict
        
        if upcoming_intersection is found (such that goes after breadcrumb within threshold distance),
        its info is given in intersection_loc_id, dist_to_upcoming_intersection fields
        otherwise reason not found explained in note_upcoming field
        
        intersection_loc_id - id of location of upcoming intersection if found
        dist_to_upcoming_intersection - distance in m to upcoming intersection
        note_upcoming - string explaining why an upcoming intersection is considered to be not found
        """
        # take all trip_datas_crossing_intersections for given trip
        cur_trip_datas_crossing_intersections = log_id_device_id_to_trip_datas_crossing_intersections_broadcasted.value.get(log_id + "_" + device_id)
        if cur_trip_datas_crossing_intersections:
            # leave only upcoming (with larger index than current) trip_datas_crossing_intersections with valid distace threshold
            cur_trip_datas_crossing_intersections_filtered = list(filter(lambda x: x["breadcrumb_index"] > breadcrumb_index, cur_trip_datas_crossing_intersections))
            if cur_trip_datas_crossing_intersections_filtered:
                # if there are several such intersections, take closest by time
                breadcrumb_crossing_intersection = min(cur_trip_datas_crossing_intersections_filtered, key=lambda x : x["breadcrumb_index"])
                dist_to_upcoming_intersection = dist_simple(breadcrumb_crossing_intersection["actual_lat"], breadcrumb_crossing_intersection["actual_lon"] ,lat ,lon)
                if dist_to_upcoming_intersection < dist_threshold:
                    return json.dumps({"intersection_loc_id": breadcrumb_crossing_intersection["id"], "dist_to_upcoming_intersection": dist_to_upcoming_intersection, "note_upcoming": None})
                else:
                    return json.dumps({"intersection_loc_id": None, "dist_to_upcoming_intersection": None, "note_upcoming": "distance to upcoming intersection: " + str(dist_to_upcoming_intersection) + " >= " + str(dist_threshold) + " \n potential upcoming id: " + str(breadcrumb_crossing_intersection["id"])})
            else:
                return json.dumps({"intersection_loc_id": None ,  "dist_to_upcoming_intersection": None, "note_upcoming": "no upcoming found"})
        return json.dumps({"intersection_loc_id": None ,  "dist_to_upcoming_intersection": None, "note_upcoming": "no crossing intersections on trip"})
    return udf(get_upcoming_intersection)
    
    

In [96]:
log_id_device_id_to_trip_datas_crossing_intersections_broadcasted = spark.sparkContext.broadcast(log_id_device_id_to_trip_datas_crossing_intersections)

In [97]:
get_upcoming_intersection_udf = get_upcoming_intersection_factory(log_id_device_id_to_trip_datas_crossing_intersections_broadcasted)

In [98]:
# max distance to upcoming intersection for it to be considered to possibly affect (cause delay) the bus
dist_threshold = 70

In [99]:
zoned_trip_data_intersection = zoned_trip_data_intersection \
    .withColumn('upcoming_intersection', get_upcoming_intersection_udf('logID', 'deviceID', 'lat', 'lon', 'breadcrumb_index', lit(dist_threshold)))

In [100]:
def upcoming_intersection_str_to_loc_id(upcoming_intersection):
    upcoming_intersection_val = json.loads(upcoming_intersection)
    return upcoming_intersection_val["intersection_loc_id"]

def upcoming_intersection_str_to_dist(upcoming_intersection):
    upcoming_intersection_val = json.loads(upcoming_intersection)
    return upcoming_intersection_val["dist_to_upcoming_intersection"]

def upcoming_intersection_str_to_note_upcoming(upcoming_intersection):
    upcoming_intersection_val = json.loads(upcoming_intersection)
    return upcoming_intersection_val["note_upcoming"]

upcoming_intersection_str_to_loc_id_udf = udf(upcoming_intersection_str_to_loc_id, StringType())
upcoming_intersection_str_to_dist_udf = udf(upcoming_intersection_str_to_dist, FloatType())
upcoming_intersection_str_to_note_upcoming_udf = udf(upcoming_intersection_str_to_note_upcoming, StringType())

In [101]:
zoned_trip_data_intersection = zoned_trip_data_intersection \
    .withColumn('upcoming_intersection_loc_id', upcoming_intersection_str_to_loc_id_udf("upcoming_intersection")) \
    .withColumn('dist_to_upcoming_intersection', upcoming_intersection_str_to_dist_udf("upcoming_intersection")) \
    .withColumn('upcoming_intersection_note', upcoming_intersection_str_to_note_upcoming_udf("upcoming_intersection"))


In [48]:
#t = zoned_trip_data_intersection.limit(500).toPandas()

In [49]:
#t.head()

Unnamed: 0,zone_id,lon,lat,time,routeName,direction,tripID,logID,deviceID,event,...,id,name,actual_lat,actual_lon,dist_to_intersection,rank_dist,upcoming_intersection,upcoming_intersection_loc_id,dist_to_upcoming_intersection,upcoming_intersection_note
0,89283082a3bffff,-122.396702,37.790425,2020-11-08 05:00:46.529,14R,inbound,9556430,20201107-t10,4010KM2034,GPS,...,929957C2-546E-435C-A24D-EC5094C9BADF,Market/Front and Fremont,37.7917,-122.398,181.976852,1,"{""intersection_loc_id"": null, ""dist_to_upcomin...",,,distance to upcoming intersection: 97.13435272...
1,89283082a3bffff,-122.396523,37.790543,2020-11-08 05:00:49.024,14R,inbound,9556430,20201107-t10,4010KM2034,GPS,...,929957C2-546E-435C-A24D-EC5094C9BADF,Market/Front and Fremont,37.7917,-122.398,182.698979,2,"{""intersection_loc_id"": null, ""dist_to_upcomin...",,,distance to upcoming intersection: 77.11142587...
2,89283082a3bffff,-122.396583,37.790497,2020-11-08 05:00:47.783,14R,inbound,9556430,20201107-t10,4010KM2034,GPS,...,929957C2-546E-435C-A24D-EC5094C9BADF,Market/Front and Fremont,37.7917,-122.398,182.75723,3,"{""intersection_loc_id"": null, ""dist_to_upcomin...",,,distance to upcoming intersection: 84.42870985...
3,89283082a3bffff,-122.396743,37.79039,2020-11-08 05:00:45.269,14R,inbound,9556430,20201107-t10,4010KM2034,GPS,...,929957C2-546E-435C-A24D-EC5094C9BADF,Market/Front and Fremont,37.7917,-122.398,182.79012,4,"{""intersection_loc_id"": null, ""dist_to_upcomin...",,,distance to upcoming intersection: 102.4662843...
4,89283082a3bffff,-122.396047,37.79112,2020-11-08 05:01:38.751,14R,inbound,9556430,20201107-t10,4010KM2034,GPS,...,929957C2-546E-435C-A24D-EC5094C9BADF,Market/Front and Fremont,37.7917,-122.398,183.358724,5,"{""intersection_loc_id"": null, ""dist_to_upcomin...",,,distance to upcoming intersection: 119.0634740...


In [102]:
def is_delayed_by_intersection_factory(log_id_device_id_to_stoparrive_stopdepart_indexes_broadcasted):
    
    def is_delayed_by_intersection(log_id, device_id, breadcrumb_index, upcoming_intersection_loc_id, mph, mph_threshold):
        """
        log_id - log_id to identify trip in combination with device_id
        device_id - device_id identify trip in combination with log_id
        breadcrumb_index - breadcrumb_index attribute for given breadcrumb (generated at previous steps)
        upcoming_intersection_loc_id - id of upcoming intersection if found else None
        mph - speed of vehicle at that point in mph
        mph_threshold - max speed in mph of bus so it is considered to be delayed (usually low number such as 3-5)
        
        returns: string representation of dict
        
        is_delayed_by_intersection - bool (True if all conditions to be considered delayed by intersection are satisfied)
        is_delayed_by_intersection_note - string explaining why the bus is considered not to be delayed (if is_delayed_by_intersection False)

        """
        
        def is_on_busstop(log_id, device_id, breadcrumb_index):
            # helper function to detect if breadcrumb is between stop arrive, stop depart events
            stoparrive_stopdeparts = log_id_device_id_to_stoparrive_stopdepart_indexes_broadcasted.value.get(log_id + "_" + device_id)
            for arrive_depart in stoparrive_stopdeparts:
                if (breadcrumb_index >= arrive_depart['arrive']) and (breadcrumb_index <= arrive_depart['depart']):
                    return True
            return False
        
        # if no upcoming intersection detected -> False 
        if upcoming_intersection_loc_id is None:
            return json.dumps({"is_delayed_by_intersection": False, "is_delayed_by_intersection_note": "upcoming intersection not found"})
        
        if mph > mph_threshold:
            return json.dumps({"is_delayed_by_intersection": False, "is_delayed_by_intersection_note": "mph > mph_threshold"})
        
        # if mph <= mph_threshold and it is between stop arrive, stop depart events -> False
        if is_on_busstop(log_id, device_id, breadcrumb_index):
            return json.dumps({"is_delayed_by_intersection": False, "is_delayed_by_intersection_note": "on bus stop"})
        
        return json.dumps({"is_delayed_by_intersection": True, "is_delayed_by_intersection_note": None})
    
    return udf(is_delayed_by_intersection)
        

In [103]:
log_id_device_id_to_stoparrive_stopdepart_indexes_broadcasted = spark.sparkContext.broadcast(log_id_device_id_to_stoparrive_stopdepart_indexes)

In [104]:
is_delayed_by_intersection_udf = is_delayed_by_intersection_factory(log_id_device_id_to_stoparrive_stopdepart_indexes_broadcasted)

In [105]:
mph_threshold = 3

In [106]:
zoned_trip_data_intersection = zoned_trip_data_intersection \
    .withColumn('is_delayed_by_intersection_str', is_delayed_by_intersection_udf('logID', 'deviceID', 'breadcrumb_index', 'upcoming_intersection_loc_id' , 'mph', lit(mph_threshold)))


In [107]:
def is_delayed_by_intersection_str_to_bool(is_delayed_by_intersection_str):
    is_delayed_by_intersection_val = json.loads(is_delayed_by_intersection_str)
    return is_delayed_by_intersection_val["is_delayed_by_intersection"]

def is_delayed_by_intersection_str_to_note(is_delayed_by_intersection_str):
    is_delayed_by_intersection_val = json.loads(is_delayed_by_intersection_str)
    return is_delayed_by_intersection_val["is_delayed_by_intersection_note"]

is_delayed_by_intersection_str_to_bool_udf = udf(is_delayed_by_intersection_str_to_bool, BooleanType())
is_delayed_by_intersection_str_to_note_udf = udf(is_delayed_by_intersection_str_to_note)

In [108]:
zoned_trip_data_intersection = zoned_trip_data_intersection \
    .withColumn('is_delayed_by_intersection', is_delayed_by_intersection_str_to_bool_udf("is_delayed_by_intersection_str")) \
    .withColumn('is_delayed_by_intersection_note', is_delayed_by_intersection_str_to_note_udf("is_delayed_by_intersection_str")) \
    .distinct()

In [57]:
#t = zoned_trip_data_intersection.limit(500).toPandas()

In [58]:
#t[["mph","is_delayed_by_intersection_note", "upcoming_intersection_note", "is_delayed_by_intersection"]].head()

Unnamed: 0,mph,is_delayed_by_intersection_note,upcoming_intersection_note,is_delayed_by_intersection
0,0.0,on bus stop,,False
1,15.075218,upcoming intersection not found,distance to upcoming intersection: 512.9754838...,False
2,7.825304,mph > mph_threshold,,False
3,10.011786,upcoming intersection not found,distance to upcoming intersection: 162.9699541...,False
4,13.349048,upcoming intersection not found,distance to upcoming intersection: 380.1526322...,False


In [59]:
t[t["is_delayed_by_intersection"]].head()

Unnamed: 0,zone_id,lon,lat,time,routeName,direction,tripID,logID,deviceID,event,...,actual_lon,dist_to_intersection,rank_dist,upcoming_intersection,upcoming_intersection_loc_id,dist_to_upcoming_intersection,upcoming_intersection_note,is_delayed_by_intersection_str,is_delayed_by_intersection,is_delayed_by_intersection_note
16,89283082a37ffff,-122.404,37.789623,2020-11-10 00:30:26.865,8,inbound,9547103,20201109-t1,4010KJ1053,GPS,...,-122.404,230.914808,54,"{""intersection_loc_id"": ""7E92EEC9-C68D-4329-93...",7E92EEC9-C68D-4329-936E-F68FF58BA019,19.644497,,"{""is_delayed_by_intersection"": true, ""is_delay...",True,
30,8928309564fffff,-122.402877,37.711993,2020-11-10 02:42:39.221,8,inbound,9547135,20201109-t3,4010KJ1053,GPS,...,-122.404,132.462504,80,"{""intersection_loc_id"": ""F49D7B83-2A9D-4603-9C...",F49D7B83-2A9D-4603-9C37-CDC12798D42B,25.412533,,"{""is_delayed_by_intersection"": true, ""is_delay...",True,
68,89283082b43ffff,-122.408647,37.797952,2020-11-11 03:25:58.781,8,outbound,9546992,20201110-t1,4010KJ1133,GPS,...,-122.409,49.929018,18,"{""intersection_loc_id"": ""669BE48F-8F99-41BC-AE...",669BE48F-8F99-41BC-AE17-24E5CE65CE02,49.928928,,"{""is_delayed_by_intersection"": true, ""is_delay...",True,
69,89283082b43ffff,-122.408647,37.797952,2020-11-11 03:26:08.645,8,outbound,9546992,20201110-t1,4010KJ1133,GPS,...,-122.409,49.929018,26,"{""intersection_loc_id"": ""669BE48F-8F99-41BC-AE...",669BE48F-8F99-41BC-AE17-24E5CE65CE02,49.928928,,"{""is_delayed_by_intersection"": true, ""is_delay...",True,
73,89283082a33ffff,-122.401982,37.788595,2020-11-11 00:01:47.113,9R,inbound,9560066,20201110-t11,4010KK1233,GPS,...,-122.402,167.356148,28,"{""intersection_loc_id"": ""EB6F5973-DD11-4ED7-96...",EB6F5973-DD11-4ED7-9607-3755BF831C8C,22.851618,,"{""is_delayed_by_intersection"": true, ""is_delay...",True,


## Delayed time frames detection

In [111]:
tsp_on__routeName_direction_to_delayed_points = defaultdict(list)
tsp_off__routeName_direction_to_delayed_points = defaultdict(list)

for row in zoned_trip_data_intersection.filter(col("is_delayed_by_intersection") == True) \
    .select("upcoming_intersection_loc_id", "time", "is_delayed_by_intersection", "routeName", "direction", "deviceID", "logID", "tspMode"
           ).collect():
    
    res = {"upcoming_intersection_loc_id": row["upcoming_intersection_loc_id"], 
              "logID": row["logID"],
              "deviceID": row["deviceID"],
              "time": row["time"]}
    
    if row["tspMode"] == "alwaysOff":
        tsp_off__routeName_direction_to_delayed_points[row["routeName"] + "_" + row["direction"]].append(res)
    else:
        tsp_on__routeName_direction_to_delayed_points[row["routeName"] + "_" + row["direction"]].append(res)

pd.to_pickle(tsp_on__routeName_direction_to_delayed_points, "tsp_on__routeName_direction_to_delayed_points.pkl")
pd.to_pickle(tsp_off__routeName_direction_to_delayed_points, "tsp_off__routeName_direction_to_delayed_points.pkl")

In [125]:
tsp_on__routeName_direction_to_delayed_points['9R_inbound']

[{'upcoming_intersection_loc_id': '5FE6676A-5DE5-4874-B55C-B393C45E9D50',
  'logID': '20201109-t1',
  'deviceID': '4010KK1177',
  'time': datetime.datetime(2020, 11, 9, 18, 51, 30, 530000)},
 {'upcoming_intersection_loc_id': '5FE6676A-5DE5-4874-B55C-B393C45E9D50',
  'logID': '20201109-t1',
  'deviceID': '4010KK1177',
  'time': datetime.datetime(2020, 11, 9, 18, 51, 31, 771000)},
 {'upcoming_intersection_loc_id': '5FE6676A-5DE5-4874-B55C-B393C45E9D50',
  'logID': '20201109-t1',
  'deviceID': '4010KK1177',
  'time': datetime.datetime(2020, 11, 9, 18, 51, 33, 6000)},
 {'upcoming_intersection_loc_id': '5FE6676A-5DE5-4874-B55C-B393C45E9D50',
  'logID': '20201109-t1',
  'deviceID': '4010KK1177',
  'time': datetime.datetime(2020, 11, 9, 18, 51, 34, 246000)},
 {'upcoming_intersection_loc_id': '5FE6676A-5DE5-4874-B55C-B393C45E9D50',
  'logID': '20201109-t1',
  'deviceID': '4010KK1177',
  'time': datetime.datetime(2020, 11, 9, 18, 51, 35, 496000)},
 {'upcoming_intersection_loc_id': '5FE6676A-5DE

In [157]:
tsp_on__routeName_direction_to_delayed_frames = defaultdict(list)
tsp_off__routeName_direction_to_delayed_frames = defaultdict(list)


max_time_between_sequential = timedelta(seconds = 15)


def delay_points_dict__to_delayed_frames_dict(tsp_routeName_direction_to_delayed_points, max_time_between_sequential):
    tsp_routeName_direction_to_delayed_frames = defaultdict(list)
    for k, val in tsp_routeName_direction_to_delayed_points.items():
        cur_delayed_points = val
        cur_delayed_points.sort(key = lambda x: x['time'])
        cur_upcoming_intersection = cur_delayed_points[0]['upcoming_intersection_loc_id']
        cur_delay = timedelta(seconds = 0)

        for ind, p in enumerate(cur_delayed_points):
            if ind + 1 < len(cur_delayed_points):
                next_p = cur_delayed_points[ind + 1]
                if next_p['upcoming_intersection_loc_id'] == cur_upcoming_intersection:
                    if next_p['time'] - p['time'] < max_time_between_sequential:
                        cur_delay += next_p['time'] - p['time']
                    else:
                        res = {"upcoming_intersection_loc_id": cur_upcoming_intersection,
                                    "logID": p["logID"],
                                    "deviceID": p["deviceID"],
                                    "time_delay": cur_delay}
                        
                        tsp_routeName_direction_to_delayed_frames[k].append(res)
                        cur_delay = timedelta(seconds = 0)
                        
                else:
                    res = {"upcoming_intersection_loc_id": cur_upcoming_intersection,
                                "logID": p["logID"],
                                "deviceID": p["deviceID"],
                                "time_delay": cur_delay}
                    
                    tsp_routeName_direction_to_delayed_frames[k].append(res)
                        
                    cur_delay = timedelta(seconds = 0)
                    cur_upcoming_intersection = next_p['upcoming_intersection_loc_id']
    return tsp_routeName_direction_to_delayed_frames
                        
    

In [158]:
tsp_on__routeName_direction_to_delayed_frames = delay_points_dict__to_delayed_frames_dict(tsp_on__routeName_direction_to_delayed_points,
                                                                                         max_time_between_sequential)

tsp_off__routeName_direction_to_delayed_frames = delay_points_dict__to_delayed_frames_dict(tsp_off__routeName_direction_to_delayed_points,
                                                                                         max_time_between_sequential)

In [161]:
tsp_off__routeName_direction_to_delayed_frames['45_inbound']

[{'upcoming_intersection_loc_id': 'AC91C7D0-3378-41CA-96E8-DF753DB1D11F',
  'logID': '20201107-t5',
  'deviceID': '4010KW2050',
  'time_delay': datetime.timedelta(seconds=7, microseconds=434000)},
 {'upcoming_intersection_loc_id': '04D6A65A-6B04-4C1F-9C22-34F442466717',
  'logID': '20201107-t5',
  'deviceID': '4010KW2050',
  'time_delay': datetime.timedelta(seconds=52, microseconds=186000)},
 {'upcoming_intersection_loc_id': 'BD18DBDC-15F6-444E-8891-F351E5711ECA',
  'logID': '20201107-t5',
  'deviceID': '4010KW2050',
  'time_delay': datetime.timedelta(seconds=24, microseconds=773000)},
 {'upcoming_intersection_loc_id': '669BE48F-8F99-41BC-AE17-24E5CE65CE02',
  'logID': '20201107-t5',
  'deviceID': '4010KW2050',
  'time_delay': datetime.timedelta(seconds=23, microseconds=688000)},
 {'upcoming_intersection_loc_id': '21B3569F-1B62-47A0-848D-4989211C354A',
  'logID': '20201107-t5',
  'deviceID': '4010KW2050',
  'time_delay': datetime.timedelta(seconds=44, microseconds=738000)},
 {'upcoming

In [162]:
pd.to_pickle(tsp_on__routeName_direction_to_delayed_frames, "tsp_on__routeName_direction_to_delayed_frames.pkl")

In [163]:
pd.to_pickle(tsp_off__routeName_direction_to_delayed_frames, "tsp_off__routeName_direction_to_delayed_frames.pkl")