In [None]:
import polars as pl
from glob import glob
import os
import seaborn as sb


def get_dfs_from_glob(glob_str: str):
    dfs = [
        pl.read_csv(i, dtypes={'stopid': pl.Utf8, 'routeid': pl.Utf8})
        for i in glob(glob_str)
    ]
    return pl.concat(dfs, how='vertical').unique()


def load_raw_data():
    """
    Load raw data from existing parquet file or from CSVs.
    """
    df = (pl.read_parquet('raw.parquet')
          if os.path.exists('raw.parquet')
          else get_dfs_from_glob('raw_data/raw_trip_*').unique()
          )
    # Save memory by re-using strings for categorical data
    return df.cast({
        'period': pl.Categorical,
        'routeid': pl.Categorical,
        'stopid': pl.Categorical
    })


MAX_DELAY = 60*30


def plot_stop(dfs: pl.DataFrame, stopid: str):
    cats = (
        dfs
        .filter(pl.col('delay').abs() < MAX_DELAY)
        .filter(pl.col('stopid') == stopid)
        .group_by('id')
        .all()
        .select('id', 'delay', 'lastupdate')
        .with_columns(pl.col('delay').list.len().alias('length'))
        .explode('delay', 'lastupdate')
        .sort('id', 'lastupdate')
        .with_columns(pl.col('id').cast(pl.Utf8).cast(pl.Categorical))
    )
    sb.scatterplot(cats, x='lastupdate', y='delay', hue='id')

In [None]:
stops = pl.read_csv(
    '/home/chrlz/dox/dl/ETS_Bus_Schedule_GTFS_Data_Feed_-_Stops_20240216.csv')

In [None]:
raw_dfs = load_raw_data()

In [None]:
# Loading from the CSVs take like 30 seconds!!!
# Save into a more compact form for easier retrieval later
raw_dfs.write_parquet('raw.parquet')

So we know that all trip id is unique for a single day, no need to worry about overlaps

Average delay in a stop every 3 minutes 10 recordings of a bus

AM: 7am-9am
PM: 4pm-7pm
OFF: 5am-7am, 9am-4-pm, 7pm-10pm

In [None]:
# Aggregate all data
MAX_DELAY = 60 * 20
delay = pl.col('delay')
aggregated = (
    raw_dfs
    .with_columns(delay.abs())
    .with_columns(
        pl.from_epoch('lastupdate', time_unit='s')
        # MST
        .dt.offset_by('-7h')
    )
    .with_columns(
        pl.col('lastupdate').dt.date().alias('date'),
        pl.col('lastupdate').dt.weekday().alias('day'),
    )
    # Get only the weekdays (6 = saturday, 7 = sunday)
    .filter(pl.col('day') < 6)
    .sort('lastupdate')
    .group_by('id', 'stopid', 'date', 'period')
    .agg(
        pl.col('routeid').first(),
        pl.col('lastupdate').max(),
        delay.max().alias('maxdelay'),
        delay.mean().alias('meandelay'),
        delay.median().alias('mediandelay'),
        delay.std().alias('stddelay'),
        delay
    )
    .with_columns(pl.col('date').dt.weekday().alias('day'))
    .with_columns(pl.col('lastupdate').dt.hour().alias('hour'))
    # Just remove trips that are above MAX_DELAY
    .filter(pl.col('meandelay') < MAX_DELAY)
)

In [None]:
def agg_group(df: pl.DataFrame, *groups: str):
    return (
        df.group_by(groups)
        .agg(
            pl.col('meandelay').mean().alias('avgdelay'),
            pl.col('meandelay').max().alias('maxdelay'),
            pl.col('lastupdate')
        )
        .sort(groups)
    )


# 1. Map delay average over week
mean_stop = aggregated.pipe(agg_group, 'stopid')

# 2. Over week by day
# TODO: Filter by PEAK, OFF, and DAY
by_day_all = aggregated.pipe(agg_group, 'day', 'period')
by_day_peak = (
    aggregated
    .filter(pl.col('period') != 'OFF')
    .pipe(agg_group, 'day', 'period')
)
by_day_off = (
    aggregated
    .filter(pl.col('period') == 'OFF')
    .pipe(agg_group, 'day', 'period')
)


# 3. Over day by hour
by_hour = aggregated.pipe(agg_group, 'day', 'hour')


def select_stop(df: pl.DataFrame, stopid: str):
    return (
        df.filter(pl.col('stopid') == stopid)
        .pipe(agg_group, 'routeid')
        .drop('lastupdate')
    )

def select_stop_and_route(df: pl.DataFrame, stopid: str, routeid: str):
    return (
        df.filter(
            (pl.col('stopid') == stopid) & 
            (pl.col('routeid') == routeid)
        )
        .pipe(agg_group, 'day', 'hour')
        .drop('lastupdate')
    )


# TODO: Select route
# 4. Time series graph of route by hour by day


highest_delay_stops = (
    mean_stop
    .group_by('stopid')
    .agg(pl.col('avgdelay').max())
    .sort('avgdelay', descending=True)
    .select('stopid')
    .head(100)
    .unique()
    .join(aggregated, on='stopid')
)
routes_on_highest_delay_stops = (
    highest_delay_stops
    .select('routeid')
    .unique()
    .join(aggregated, on='routeid')
)
agg_routes_on_highest_delay = (
    routes_on_highest_delay_stops
    .pipe(agg_group, 'routeid')
    .drop('lastupdate')
)

def plot_by_route(df: pl.DataFrame):
    tidy = (
        agg_routes_on_highest_delay
        .sort('routeid')
        .melt(id_vars='routeid')
        .sort('routeid')
    )
    xticks = tidy['routeid']
    xticks_unique = tidy['routeid'].unique()
    ax = sb.barplot(
        data=tidy,
        x='routeid',
        y='value',
        hue='variable',
        width=1,
        order=xticks,
    )
    ax.set_xticks(ax.get_xticks(), xticks_unique, rotation=90)


# 5. Mapping delay propagations within a route
# 6. TODO

def plot_stop_and_route(df: pl.DataFrame, stop: str, route: str):
    days= pl.DataFrame({
        "day": [*range(1, 6)],
        "letter": ['M', 'T', 'W', 'R', 'F'],
    }, schema={"day": pl.Int8, 'letter': pl.Categorical})
    by_day_by_hour_for_stop = (
        select_stop_and_route(df,stop,route)
        .join(days, on='day')
        .drop('day')
        .rename({'letter': 'day'})
    )

    ax = sb.barplot(by_day_by_hour_for_stop, x='hour', y='avgdelay', hue='day')
    ax.set_title(f'Average delay for route {route} stop {stop}')

plot_stop_and_route(aggregated, "2260", "637")
# select_stop(aggregated, "2260")

# sb.barplot(by_day_by_hour_for_stop, x='hour', y='maxdelay')


highest_delay_stops['stopid'].unique()
# select_stop(aggregated, "2260")

Some exploration of the `trips.json` file which DOES contain coordinates of the
trip

From initial observation of the data, each trip may have a unique path, even for
the same route (e.g. shorter route for off-peak hours, my bus route home was
like that at one point)

Other assumptions:
- There is only one type of geometry line: MultiLineString
- The actual coordinates are wrapped inside another JSON array

In [None]:
def load_trips_df():
    if os.path.exists('data/trips.parquet'):
        return pl.read_parquet('data/trips.parquet')

    import json
    return (
        pl.DataFrame(json.load(open('data/trips.json')))
        # Trim JSON data according to the assumptions 
        .with_columns(pl.col('geometry_line').struct.field('coordinates'))
        .with_columns(pl.col('coordinates').list.get(0))
        .drop('geometry_line')
    )
trips = load_trips_df()
if not os.path.exists('data/trips.parquet'):
    trips.write_parquet('data/trips.parquet')

How many counts should we deem useful for visualization of data?

In [None]:
pl.Config.set_fmt_table_cell_list_len(100)
aggregated['routeid']
# 1_250_916