# Sampled using scheduled times

In [1]:
%matplotlib inline

In [15]:
import os
import random
import matplotlib.pyplot as plt
import geopandas as gpd
import pandas as pd
import gtfs_kit as gk
import numpy as np
import osmnx as ox
import networkx as nx
from tqdm import tqdm
from shapely.geometry import Polygon, LineString, Point
import warnings
warnings.filterwarnings('ignore')

In [3]:
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
from pyspark import SparkConf
import pandas as pd
import pickle
from tqdm import tqdm

In [4]:
spark = SparkSession.builder.config('spark.executor.cores', '8').config('spark.executor.memory', '80g')\
        .config("spark.sql.session.timeZone", "UTC").config('spark.driver.memory', '40g').master("local[26]")\
        .appName("wego-daily").config('spark.driver.extraJavaOptions', '-Duser.timezone=UTC').config('spark.executor.extraJavaOptions', '-Duser.timezone=UTC')\
        .config("spark.sql.datetime.java8API.enabled", "true").config("spark.sql.execution.arrow.pyspark.enabled", "true")\
        .config("spark.sql.autoBroadcastJoinThreshold", -1)\
        .config("spark.driver.maxResultSize", 0)\
        .config("spark.shuffle.spill", "true")\
        .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

22/10/04 18:39:59 WARN Utils: Your hostname, scope-vanderbilt resolves to a loopback address: 127.0.1.1; using 10.2.218.69 instead (on interface enp8s0)
22/10/04 18:39:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/04 18:40:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
f = os.path.join('/home/jptalusan/mta_stationing_problem/data/processed/apc_weather_gtfs.parquet')
apcdata = spark.read.load(f)
print(apcdata.columns)
get_columns = ['trip_id', 'transit_date', 'arrival_time', 'block_abbr', 'scheduled_time', 'vehicle_id',
              'stop_sequence', 'stop_id_original', 'load', 'ons', 'offs', 'route_id', 'route_direction_name']
get_str = ", ".join([c for c in get_columns])
apcdata.createOrReplaceTempView("apc")
# # filter subset
query = f"""
       SELECT {get_str}
       FROM apc
       """
print(query)
apcdata = spark.sql(query)
apcdata = apcdata.withColumn("route_id_direction", F.concat_ws('_',apcdata.route_id, apcdata.route_direction_name))
apcdata = apcdata.drop('route_id', 'route_direction_name')
apcdf = apcdata.toPandas()
fp = os.path.join('results/simulator_baseline.pkl')
apcdf.to_pickle(fp)

['gtfs_date', 'dayofweek', 'hour', 'gtfs_route_id', 'gtfs_direction_id', 'stop_id', 'transit_date', 'trip_id', 'day', 'overload_id', 'vehicle_id', 'block_abbr', 'activation_date', 'activation_date_str', 'arrival_time', 'arrival_time_str', 'block_stop_order', 'deactivation_date', 'deactivation_date_str', 'departure_time', 'departure_time_str', 'load', 'load_factor', 'map_latitude', 'map_longitude', 'offs', 'ons', 'pattern_num', 'route_direction_name', 'route_id', 'scheduled_time', 'scheduled_time_str', 'source_pattern_id', 'stop_id_list', 'stop_id_original', 'stop_name', 'stop_sequence', 'stop_sequence_list', 'transit_date_str', 'update_date', 'vehicle_capacity', 'zero_load_at_trip_end', 'count', 'darksky_temperature', 'darksky_humidity', 'darksky_nearest_storm_distance', 'darksky_precipitation_intensity', 'darksky_precipitation_probability', 'darksky_pressure', 'darksky_wind_gust', 'darksky_wind_speed', 'weatherbit_rh', 'weatherbit_wind_spd', 'weatherbit_app_temp', 'weatherbit_temp', '

                                                                                

## Setup map
* Match locations with vertices on the map
* Need to update and add data to `_redo` folder

In [6]:
fp = os.path.join('data', 'shapefiles', "tncounty")
gdf_county = gpd.read_file(fp)
gdf_dav = gdf_county[gdf_county["NAME"] == "Davidson"]
gdf_dav = gdf_dav.to_crs("EPSG:4326")

G = ox.graph_from_polygon(gdf_dav.geometry.iloc[0], network_type='drive')
G = ox.add_edge_speeds(G)
G = ox.add_edge_travel_times(G)

fp = os.path.join('data', 'davidson_graph.graphml')
ox.save_graphml(G, fp)

In [7]:
f = os.path.join('/home/jptalusan/mta_stationing_problem/data/processed/apc_weather_gtfs.parquet')
apcdata = spark.read.load(f)
apcdata.columns
get_columns = ['stop_sequence', 'stop_id_original', 'stop_name', 'map_latitude', 'map_longitude']
get_str = ", ".join([c for c in get_columns])
apcdata.createOrReplaceTempView("apc")

# # filter subset
query = f"""
SELECT {get_str}
FROM apc
"""
apcdata = spark.sql(query)
apcdata = apcdata.drop_duplicates(['stop_id_original'])
apcdf = apcdata.toPandas()

apcdf['nearest_node'] = ox.nearest_nodes(G, apcdf['map_longitude'], apcdf['map_latitude'])
apcdf['nearest_edge'] = ox.nearest_edges(G, apcdf['map_longitude'], apcdf['map_latitude'])

                                                                                

In [8]:
fp = os.path.join('results', 'stops_node_matching.pkl')
apcdf.to_pickle(fp)

## Sanity checking

In [None]:
r = ox.shortest_path(G, 202177835, 9702091174, weight='travel_time')
ox.plot_graph_route(G, r)

In [None]:
cn = apcdf[apcdf['stop_id_original'] == 'MCC4_24']['nearest_node'].values[0]
nn = apcdf[apcdf['stop_id_original'] == 'UNI2AEF']['nearest_node'].values[0]
r = ox.shortest_path(G, cn, nn, weight='travel_time')
cols = ['osmid', 'length', 'travel_time']
attrs = ox.utils_graph.get_route_edge_attributes(G, r)
tt = pd.DataFrame(attrs)[cols]['travel_time'].sum()
tt

In [None]:
pd.DataFrame(attrs)

## Convert APC load data to something static and fast to access (look up)

In [None]:
# import pandas as pd
# fp = os.path.join('results/simulator_baseline.pkl')
# baseline_data = pd.read_pickle(fp)
# baseline_data['dow'] = baseline_data['scheduled_time'].dt.dayofweek
# baseline_data['IsWeekend'] = (baseline_data["scheduled_time"].dt.weekday >= 5).astype('int')
# baseline_data['time'] = baseline_data['scheduled_time'].dt.time
# baseline_data = baseline_data.query("load >= 0 and load <= 100")
# baseline_data = baseline_data.groupby(['route_id_direction', 'block_abbr', 'stop_id_original', 'time', 'IsWeekend']).agg({'load':list})
# fp = os.path.join('results/loads_by_scheduled_time.pkl')
# baseline_data.to_pickle(fp)

## For boarding

In [None]:
# import pandas as pd
# # fp = os.path.join('results/simulator_baseline.pkl')
# baseline_data = pd.read_pickle(fp)
# baseline_data['dow'] = baseline_data['scheduled_time'].dt.dayofweek
# baseline_data['IsWeekend'] = (baseline_data["scheduled_time"].dt.weekday >= 5).astype('int')
# baseline_data['time'] = baseline_data['scheduled_time'].dt.time
# baseline_data = baseline_data.query("ons >= 0 and ons <= 100")
# baseline_data = baseline_data.groupby(['route_id_direction', 'block_abbr', 'stop_id_original', 'time', 'IsWeekend']).agg({'ons':list})
# fp = os.path.join('results/ons_by_scheduled_time.pkl')
# baseline_data.to_pickle(fp)

## For alighting

In [None]:
# import pandas as pd
# fp = os.path.join('results/simulator_baseline.pkl')
# baseline_data = pd.read_pickle(fp)
# baseline_data['dow'] = baseline_data['scheduled_time'].dt.dayofweek
# baseline_data['IsWeekend'] = (baseline_data["scheduled_time"].dt.weekday >= 5).astype('int')
# baseline_data['time'] = baseline_data['scheduled_time'].dt.time
# baseline_data = baseline_data.query("offs >= 0 and offs <= 100")
# baseline_data = baseline_data.groupby(['route_id_direction', 'block_abbr', 'stop_id_original', 'time', 'IsWeekend']).agg({'offs':list})
# fp = os.path.join('results/offs_by_scheduled_time.pkl')
# baseline_data.to_pickle(fp)

## For Travel Times
* for each route_id_dir, block, stop to stop

In [9]:
fp = os.path.join('results/simulator_baseline.pkl')
baseline_data = pd.read_pickle(fp).dropna(subset=['arrival_time']).sort_values(by=['transit_date', 'trip_id', 'stop_sequence'])
baseline_data['dow'] = baseline_data['scheduled_time'].dt.dayofweek
baseline_data['IsWeekend'] = (baseline_data["scheduled_time"].dt.weekday >= 5).astype('int')
baseline_data['time'] = baseline_data['scheduled_time'].dt.time

In [10]:
from multiprocessing import Pool, cpu_count
import datetime as dt

def get_traveltimes(tdf):
    tdf = tdf.sort_values('stop_sequence')
    if len(tdf) <= 2:
        return pd.DataFrame()
    # HACK: This is for correcting the issue that the first stop's arrival_time starts much earlier than the scheduled time

    tdf = tdf.reset_index(drop=True)
    tdf['scheduled_timestamp'] = (tdf['arrival_time'] - dt.datetime(1970,1,1)).dt.total_seconds()
    tdf['time_to_next_stop'] = tdf['scheduled_timestamp'].shift(-1) - tdf['scheduled_timestamp']
    tdf.at[0, 'time_to_next_stop'] = (tdf.at[1, 'arrival_time'] - tdf.at[0, 'scheduled_time']).total_seconds()
    tdf = tdf.drop('scheduled_timestamp', axis=1)
    tdf = tdf.fillna(0)
    return tdf
    
def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for name, group in dfGrouped])
    return pd.concat(ret_list)

out_arr = applyParallel(baseline_data.groupby(['block_abbr', 'route_id_direction', 'transit_date', 'trip_id']), get_traveltimes)
tdf = out_arr.groupby(['route_id_direction', 'block_abbr', 'stop_sequence', 'stop_id_original', 'time', 'IsWeekend']).agg({'time_to_next_stop':list})
# fp = os.path.join('results/travel_time_by_scheduled_time.pkl')
# tdf.to_pickle(fp)

### Sample travel times for a single day or chain

In [2]:
import pandas as pd
import random

fp = os.path.join('results/travel_time_by_scheduled_time.pkl')
tdf = pd.read_pickle(fp)

random.seed(100)

tdf = tdf.reset_index()
tdf['sampled_travel_time'] = tdf.reset_index()['time_to_next_stop'].apply(lambda x: random.choice(x))
tdf['sampled_travel_time'] = abs(tdf['sampled_travel_time'])
fp = os.path.join('results/sampled_travel_times.pkl')
tdf.to_pickle(fp)

In [3]:
tdf

Unnamed: 0,route_id_direction,block_abbr,stop_sequence,stop_id_original,time,IsWeekend,time_to_next_stop,sampled_travel_time
0,14_FROM DOWNTOWN,1400,1,MCC4_20,06:15:00,0,"[362.0, 238.0, 150.0, 228.0, 246.0, 158.0, 276...",328.0
1,14_FROM DOWNTOWN,1400,1,MCC4_20,06:15:00,1,"[264.0, 288.0, 204.0, 264.0, 270.0, 296.0, 268...",402.0
2,14_FROM DOWNTOWN,1400,1,MCC4_20,06:18:00,0,"[162.0, 164.0, 240.0, 168.0, 160.0, 240.0, 254...",240.0
3,14_FROM DOWNTOWN,1400,1,MCC4_20,06:18:00,1,"[580.0, 136.0, 178.0, 230.0, 154.0, 262.0, 212...",232.0
4,14_FROM DOWNTOWN,1400,1,MCC4_20,07:15:00,0,"[148.0, 148.0, 148.0, 240.0, 238.0, 234.0, 242...",244.0
...,...,...,...,...,...,...,...,...
533884,9_TO DOWNTOWN,8600,6,2AVJUNSF,07:48:54,0,"[26.0, 30.0, 42.0, 24.0, 24.0, 40.0, 24.0, 22....",32.0
533885,9_TO DOWNTOWN,8600,6,2AVJUNSF,07:48:57,0,"[34.0, 58.0, 34.0]",58.0
533886,9_TO DOWNTOWN,8600,7,2ASTOSM,07:50:02,0,"[228.0, 242.0, 169.0, 188.0, 284.0, 208.0, 266...",284.0
533887,9_TO DOWNTOWN,8600,7,2ASTOSM,07:50:06,0,"[262.0, 328.0, 197.0]",262.0


## Generate travel distance pairs for all stops

In [66]:
fp = '/media/seconddrive/JP/wego-occupancy-JP/data/static_gtfs/WeGoRawGTFS/04-october-2021-fixed.zip'
feed = gk.read_feed(fp, dist_units='km')
feed.validate()
stop_times_df = gk.get_stop_times(feed)
stop_pairs = []
for trip_id, trip_df in stop_times_df.groupby('trip_id'):
    trip_df['next_stop_id'] = trip_df['stop_id'].shift(-1)
    trip_df = trip_df.fillna(0)
    trip_df['shape_dist_traveled_km'] = (trip_df['shape_dist_traveled'].shift(-1) - trip_df['shape_dist_traveled'])
    trip_df = trip_df[['stop_id', 'next_stop_id', 'shape_dist_traveled_km']][:-1]
    stop_pairs.append(trip_df)
stop_pairs = pd.concat(stop_pairs)
stop_pairs = stop_pairs.drop_duplicates()
stop_pairs.head()

Unnamed: 0,stop_id,next_stop_id,shape_dist_traveled_km
0,MCC4_20,UNI2AEF,0.5776
1,UNI2AEF,1SWOONM,0.6813
2,1SWOONM,1SJAMNM,0.3601
3,1SJAMNM,N1SOLDNM,0.3169
4,N1SOLDNM,DICGRANN,0.861


In [68]:
fp = os.path.join('results/gtfs_distance_pairs_km.pkl')
stop_pairs.to_pickle(fp)

# Sanity checks

In [None]:
travel_time_path = '/home/jptalusan/gits/mta_simulator_redo/code_root/scenarios/baseline/data/sampled_travel_times.pkl'
sampled_travel_time = pd.read_pickle(travel_time_path)
sampled_travel_time

In [None]:
fp = os.path.join('/home/jptalusan/gits/mta_simulator_redo/code_root/scenarios/baseline/data/travel_time_by_scheduled_time.pkl')
tdf = pd.read_pickle(fp).reset_index()

In [None]:
tdf

In [None]:
apcdf.query("trip_id == '219844'").sort_values('scheduled_time')

In [None]:
apcdf.trip_id.unique()

In [None]:
11 * 60 + 22

In [None]:
a = tdf.reset_index().query("route_id_direction == '34_FROM DOWNTOWN' and block_abbr == 3400 and stop_id_original == 'MCC4_22'")
a.head()

In [None]:
tdf.loc[('23_FROM DOWNTOWN', 2311, 8, 'VAIBRIEM',)]

In [None]:
adf = tdf.loc[('23_FROM DOWNTOWN', 2311)]
adf = adf.query('IsWeekend == 0')
# a = 26
# b = 48
# # adf.query('time_window == @a or time_window == @b').sample(1)['time_to_next_stop'].values[0]
# adf = adf.explode('time_to_next_stop').query('time_to_next_stop > 0').reset_index()
adf = adf.explode('time_to_next_stop').reset_index()
# adf[adf['time_window'].isin(range(22, 32))].sample(1)

adf

In [None]:
list(range(0, 10))

In [None]:
len(tdf.loc[('3_TO DOWNTOWN', 300, 1, 'WHICHASF', 45, 0)]['time_to_next_stop'])

In [None]:
import pandas as pd

fp = os.path.join('/home/jptalusan/gits/mta_simulator_redo/code_root/scenarios/baseline/data/gtfs_distance_pairs_km.pkl')
stop_pairs = pd.read_pickle(fp)
if len(stop_pairs.query("stop_id == 'JP'")) > 0:
    print("KP")
stop_pairs

## Generate Disruption probabilities

In [None]:
## Generate Disruption probabilities
# Get service disruption dataset
fp = os.path.join('code/data/Service Disruptions_07_2019_08_2022.csv')
disruptions_df = pd.read_csv(fp)
disruptions_df.head()
disruptions_df['DATETIME'] = disruptions_df['DATE'] + ' ' + disruptions_df['TIME']
disruptions_df['DATE'] = pd.to_datetime(disruptions_df['DATE'], format='%m/%d/%y', errors='coerce')
disruptions_df['TIME'] = pd.to_datetime(disruptions_df['TIME'], format='%H:%M:%S', errors='coerce')
disruptions_df['DATETIME'] = pd.to_datetime(disruptions_df['DATETIME'], format='%m/%d/%y %H:%M:%S', errors='coerce')

# Remove weather related disruptions
# disruptions_df = disruptions_df[(disruptions_df['REASON'] != 'Weather')].sort_values(by=['DATETIME']).reset_index(drop=True)
print('Shape:', disruptions_df.shape)
# disruptions_df = disruptions_df.drop(columns=['COMMENTS'])
disruptions_df['BLOCK'] = disruptions_df['BLOCK'].astype('int32')

# Convert to spark dataframe for merging
# disruptions_sp = spark.createDataFrame(disruptions_df)
# disruptions_sp = disruptions_sp.withColumn("BLOCK", F.col("BLOCK").cast(IntegerType()))
disruptions_counts_df = disruptions_df.groupby('START_STOP_ABBR').agg('count')[['REASON']].reset_index()
disruptions_counts_df.sort_values('REASON')
# Count the number of trips throughout this time
start_date = disruptions_df.sort_values(by=['DATETIME']).iloc[0]['DATETIME']
end_date   = disruptions_df.sort_values(by=['DATETIME']).iloc[-1]['DATETIME']
start_date, end_date

# # filter subset
get_columns = ['transit_date', 'trip_id', 'departure_time', 'stop_id_original']
get_str = ", ".join([c for c in get_columns])

query = f"""
SELECT {get_str}
FROM apc
WHERE (transit_date >= '{start_date.date()}') AND (transit_date <= '{end_date.date()}')
"""
print(query)

apcdataafternegdelete = spark.sql(query)
apcdataafternegdelete = apcdataafternegdelete.dropna()
trips_df = apcdataafternegdelete.toPandas()
trips_df = trips_df.groupby('stop_id_original').agg('count').sort_values('trip_id').reset_index()

In [None]:
merged_df = pd.merge(trips_df, disruptions_counts_df[['START_STOP_ABBR', 'REASON']], left_on='stop_id_original', right_on='START_STOP_ABBR')
all_stop_probabilities = trips_df[['stop_id_original']]
all_stop_probabilities = pd.merge(all_stop_probabilities, merged_df[['stop_id_original', 'probability']], on='stop_id_original', how='outer').fillna(0)
all_stop_probabilities.sort_values('probability')

fp = os.path.join('code/Scenarios/data/disruption_probabilities.pkl')
all_stop_probabilities.to_pickle(fp)

merged_df['probability'] = merged_df['REASON'] / merged_df['transit_date']
merged_df['probability'] = merged_df['probability']/merged_df['probability'].max()
merged_df.sort_values('probability').tail(10)