In [None]:
import pandas as pd
import numpy as np

# read in static gtfs
# gtfs static file needs to be unzipped into gtfs folder

trips = pd.read_csv('./gtfs/trips.txt', low_memory=False)
routes = pd.read_csv('./gtfs/routes.txt', low_memory=False)
stops = pd.read_csv('./gtfs/stops.txt', low_memory=False)
stop_times = pd.read_csv('./gtfs/stop_times.txt', low_memory=False, parse_dates=['arrival_time','departure_time'])

stop_times_drop_columns = [
    'stop_headsign',
    'continuous_pickup',
    'continuous_drop_off'
]
stop_times.drop(columns=stop_times_drop_columns, inplace=True)

# create gtfs static headways

trips = trips.merge(routes[['route_id','route_type']], how='left', on=['route_id'])
stop_times = stop_times.merge(trips[['trip_id','route_type','route_id','service_id','direction_id']], how='left', on=['trip_id'])
stop_times['stop_id_grouped'] = np.where(stop_times['checkpoint_id'].isna(), stop_times['stop_id'], stop_times['checkpoint_id'])

sub_stop_times = stop_times[(stop_times.route_type < 2)]

def time_to_seconds(time:str) -> int:
    (hour, min, sec) = time.split(":")
    return int(hour) * 3600 + int(min) * 60 + int(sec)

sub_stop_times.loc[:,'departure_time_sec'] = sub_stop_times.loc[:,'departure_time'].apply(time_to_seconds)
sub_stop_times = sub_stop_times.sort_values(by=['direction_id','route_id','service_id','stop_id_grouped','departure_time'])

sub_stop_times['prev_departure_time_sec'] = sub_stop_times['departure_time_sec'].shift().where(sub_stop_times.stop_id_grouped.eq(sub_stop_times.stop_id_grouped.shift()))
sub_stop_times.dropna(axis=0, subset=['prev_departure_time_sec'], inplace=True)
sub_stop_times['prev_departure_time_sec'] = sub_stop_times['prev_departure_time_sec'].astype('int', errors='ignore')
sub_stop_times['head_way'] = sub_stop_times['departure_time_sec'] - sub_stop_times['prev_departure_time_sec']
sub_stop_times

In [None]:
stop_times[(stop_times.checkpoint_id.isna())][['stop_id','checkpoint_id']]

In [None]:
import boto3
import time
import pathlib
import pickle

# pull down vehicle position file list from s3

BUCKET_NAME = 'mbta-ctd-dataplatform-dev-springboard'
FILE_LIST_FILE = pathlib.Path('gtfs_rt_vehicle_pos')
PREFIX = 'lamp/RT_VEHICLE_POSITIONS/year=2022/month=7/day='

s3 = boto3.resource('s3')

bucket = s3.Bucket(BUCKET_NAME)
client = bucket.meta.client

t0 = time.monotonic()

if FILE_LIST_FILE.is_file():
    with open(FILE_LIST_FILE, mode='rb') as f:
        obj_list = pickle.load(f)

else:
    obj_list = []
    for prefix in ('20','21','22',):
        paginator = client.get_paginator('list_objects_v2')
        pages = paginator.paginate(
            Bucket=BUCKET_NAME,
            Prefix=f'{PREFIX}{prefix}/',
        )
        for page in pages:
            for obj in page['Contents']:
                if obj['Size'] > 0:
                    obj_list.append(obj)
    with open(FILE_LIST_FILE, mode='wb') as f:
        pickle.dump(obj_list, f)

run_time = time.monotonic() - t0

print(f"{len(obj_list):,} files found in {run_time:.2f} seconds")


In [None]:
import io
from typing import IO

# pull down vehicle positions files from s3 and concat into dataframe

def get_zip_buffer(filename: str) -> IO[bytes]:
    """
    Get a buffer for a zip file from s3 so that it can be read by zipfile
    module. filename is assumed to be the full path to the zip file without the
    s3:// prefix. Return it along with the last modified date for this s3
    object.
    """
    # inspired by
    # https://betterprogramming.pub/unzip-and-gzip-incoming-s3-files-with-aws-lambda-f7bccf0099c9
    (bucket, file) = filename.split("/", 1)
    s3_resource = boto3.resource("s3")
    zipped_file = s3_resource.Object(bucket_name=bucket, key=file)

    return io.BytesIO(zipped_file.get()["Body"].read())


drop_columns = [
    'occupancy_percentage',
    'occupancy_status',
    'vehicle_id', # always same as 'entity_id'
    # 'start_date', # redundant with timestamp
    # 'vehicle_label',
    'vehicle_consist',
    'bearing',
    'latitude',
    'longitude',
    'trip_id', # Always None?
]
import pandas as pd

df = None
for obj in obj_list:
    filename = f"{BUCKET_NAME}/{obj['Key']}"
    print(f"Downloading: {filename}")
    t_df = pd.read_parquet(get_zip_buffer(filename))
    t_df.drop(columns=drop_columns, inplace=True)
    t_df = t_df[(t_df.route_id.isin(sub_stop_times.route_id.unique()))]
    if df is None:
        df = t_df
    else:
        df = pd.concat([df,t_df])
    print(f"Processed: {filename}")

df

In [None]:
group_by_cols = ['entity_id','current_status','current_stop_sequence','stop_id','direction_id','route_id','schedule_relationship','start_date','start_time','vehicle_label']
aggfunc = {'vehicle_timestamp':['min','max']}
live_headways = df.pivot_table(df, index=group_by_cols, aggfunc=aggfunc).reset_index()

live_headways.sort_values(by=['vehicle_label',('vehicle_timestamp','min')])

In [None]:
stops = live_headways[(live_headways.current_status == 'STOPPED_AT')].sort_values(by=['vehicle_label',('vehicle_timestamp','min')])
stops.columns = [col[0] if col[1] == '' else '_'.join(col).strip() for col in stops.columns.values]
stops['next_stop_id'] = stops['stop_id'].shift(-1)
stops = stops.merge(next_stop, how='left', on=['route_id','stop_id','direction_id'])

stops['next_stop_ok'] = np.where((stops['expect_next_stop_id'].isna()) | (stops['next_stop_id'].isin(stops['expect_next_stop_id'])), True,False)
# stops.groupby(by=['next_stop_ok']).size()
stops[(stops.next_stop_ok == False)]