In [2]:
import os, sys, shutil, zipfile, csv
import requests
import psycopg2
import fiona
import pandas as pd
import numpy as np
import geopandas
from sqlalchemy import URL, create_engine, text
from trino.auth import OAuth2Authentication

In [3]:
# Add in parent directories to sys.path to get multiHook library
current_dir = os.getcwd()
for x in range(4):  # Look four levels up
    parent_dir = os.path.dirname(current_dir)
    if parent_dir not in sys.path:
        sys.path.append(parent_dir)
    current_dir = parent_dir

import multihook.pycnxn.dbhook as dbhook

# Read in GTFS

In [18]:
def read_gtfs_zip(
    gtfs_dir,
    import_calendar=True,
    import_routes=True,
    import_shapes=True,
    import_stops=True,
    import_stoptimes=True,
    import_trips=True
):
    """
    Read gtfs files to pandas dataframes. Used when the input gtfs is a zip file
    """
    
    output = {}

    # Import calendar
    if import_calendar == True:
        print("Loading calendar and calendar_dates")
        calendar_df = pd.read_csv(os.path.join(gtfs_dir, "calendar.txt"), dtype={
            'service_id': 'str',  
            'monday': 'bool',  
            'tuesday': 'bool',  
            'wednesday': 'bool',  
            'thursday': 'bool',  
            'friday': 'bool', 
            'saturday': 'bool',  
            'sunday': 'bool',  
            'start_date': 'str', 
            'end_date': 'str',
        })
        output['calendar'] = calendar_df
    
    # Import calendar_dates
    if import_calendar == True:    
        calendar_dates_df = pd.read_csv(os.path.join(gtfs_dir, "calendar_dates.txt"), dtype={
            'service_id': 'str',  
            'date': 'str',
            'exception_type': 'Int64',
        })
        output['calendar_dates'] = calendar_dates_df
    
    # Import routes
    if import_routes == True:
        print("Loading routes")
        routes_df = pd.read_csv(os.path.join(gtfs_dir, 'routes.txt'), dtype={
                'route_id': 'str',  
                'agency_id': 'str',  
                'route_short_name': 'str',  
                'route_long_name': 'str', 
                'route_desc': 'str', 
                'route_type': 'Int64',
                'route_url': 'str',
                'route_color': 'str',  
                'route_text_color': 'str', 
                'exact_times': 'bool'
            })
        output['routes'] = routes_df
    
    # Import shapes
    if import_shapes == True:
        print("Loading shapes")
        shapes_df = pd.read_csv(os.path.join(gtfs_dir, "shapes.txt"), dtype={
                'shape_id': 'str', 
                'shape_pt_lat': 'float', 
                'shape_pt_lon': 'float',  
                'shape_pt_sequence': 'Int64',
                'shape_dist_traveled': 'float',
            })
        output['shapes'] = shapes_df

    # Import stops
    if import_stops == True:
        print("Loading stops")
        stops_df = pd.read_csv(os.path.join(gtfs_dir, "stops.txt"), dtype={
                'stop_id': 'str', 
                'stop_name': 'str',
                'stop_lat': 'float', 
                'stop_lon': 'float',  
                'location_type': 'Int64', 
                'parent_station': 'str',
            })
        output["stops"] = stops_df

    # Import stop_times
    if import_stoptimes == True:
        print("Loading stop times")
        stoptimes_df = pd.read_csv(os.path.join(gtfs_dir, "stop_times.txt"), dtype={
                'trip_id': 'str',
                'stop_id': 'str', 
                'stop_name': 'str',
                'arrival_time': 'str',
                'departure_time': 'str',
                'stop_sequence': 'Int64'
            })
        output["stoptimes"] = stoptimes_df
    
    # Import trips
    if import_trips == True:
        print("Loading trips")
        trips_df = pd.read_csv(os.path.join(gtfs_dir, "trips.txt"), dtype={
            'route_id': 'str', 
            'service_id': 'str',  
            'trip_id': 'str',
            'shape_id': 'str', 
            'trip_headsign': 'str', 
            'direction_id': 'str',  
            'block_id': 'str', 
            'wheelchair_accessible': 'str', 
            'route_direction': 'str', 
            'trip_note': 'str', 
            'bikes_allowed': 'str'
        })
        output['trips'] = trips_df

    return output

In [4]:
def import_bundle(
    bundle,
    import_trips=True,
    import_shapes=True,
    import_stops=True,
    import_stoptimes=True,
    cache_bundle=False,
):
    """Import GTFS data from the following tables in ADLS:
    - core.dbo.fact_gtfs_trips
    - core.dbo.fact_gtfs_shapes
    - core.dbo.fact_gtfs_stops
    - core.dbo.fact_gtfs_shape_reference
    - core.dbo.fact_gtfs_stop_times
    """

    # GTFS bundle can take some time to import. If there is already a cached GTFS bundle pickle file in the repository, read in that data instead
    picklefile = bundle + ".pickle"
    if os.path.isfile(picklefile) == True:
        with open(picklefile, "rb") as handle:
            gtfs_bundle = pickle.load(handle)
            return gtfs_bundle

    con = create_engine(
        r"trino://trino-route-trino.apps.mtasiprod.eastus.aroapp.io:443/mtadatalake",
        connect_args={
            "auth": OAuth2Authentication(),
            "http_scheme": "https",
        }
    )
    cur = con.connect()
    
    output = {}
    # Import trips
    trip_sql = f"""
    with agency as ( -- get agency id for each route
        select distinct route_id, agency_id from mtadatalake.core.fact_gtfs_routes 
        where bundle = '{bundle}'
        )
    SELECT trips.route_id ,trip_id, service_id,trip_headsign,direction_id,block_id,shape_id,boarding_type,bundle, agency.agency_id
    FROM mtadatalake.core.fact_gtfs_trips trips
    join agency on trips.route_id = agency.route_id
    where trips.bundle = '{bundle}'
    """
    if import_trips == True:
        print('Loading trips')
        f = cur.execute(text(trip_sql))
        trips_df = pd.DataFrame(f.fetchall())
        output["trips"] = trips_df

    # Import shapes
    shape_sql = f"""
    SELECT shape_id, shape_pt_sequence, shape_pt_lat, shape_pt_lon, bundle
    FROM mtadatalake.core.fact_gtfs_shapes
    where bundle = '{bundle}'
    """
    if import_shapes == True:
        print("Loading shapes")
        f = cur.execute(text(shape_sql))
        shapes_df = pd.DataFrame(f.fetchall())
        output["shapes"] = shapes_df

    # Import stops
    stop_sql = f"""
    SELECT fact_gtfs_stops.stop_id, stop_name, stop_lat, stop_lon, shape_ref.revenue_stop, bundle
    FROM mtadatalake.core.fact_gtfs_stops
    left join (
        SELECT stop_id, MAX(revenue_stop) AS revenue_stop
        FROM mtadatalake.core.fact_gtfs_shape_reference
        where bundle = '{bundle}'
        GROUP BY stop_id
    ) shape_ref
    on fact_gtfs_stops.stop_id = shape_ref.stop_id
    where bundle = '{bundle}'
    """
    if import_stops == True:
        print("Loading stops")
        f = cur.execute(text(stop_sql))
        stops_df = pd.DataFrame(f.fetchall())
        output["stops"] = stops_df

    # Import stop_times
    stoptime_sql = f"""
    SELECT trip_id, stop_id, arrival_time, departure_time, timepoint, stop_sequence, pickup_type, drop_off_type, bundle
    FROM mtadatalake.core.fact_gtfs_stop_times
    where bundle = '{bundle}'
    """
    if import_stoptimes == True:
        print("Loading stop times")
        f = cur.execute(text(stoptime_sql))
        stoptimes_df = pd.DataFrame(f.fetchall())
        output["stoptimes"] = stoptimes_df

    if cache_bundle == True:
        with open(picklefile, "wb") as handle:
            pickle.dump(output, handle, protocol=pickle.HIGHEST_PROTOCOL)
        print(f"GTFS data for {bundle} successfully loaded and cached")
    else:
       print(f"GTFS data for {bundle} successfully loaded") 

    return output

In [20]:
def find_most_representative(bundle):
    trino = dbhook.Hook(adls_trino)

    representative_day_query = '''
    with bundle_dates AS (-- Get a list of dates and bundles
      SELECT service_date, format_datetime(service_date, 'EEEE') day_of_week, bundle, pick_year, pick_name, sched_type, manual
      FROM mtadatalake.core.dim_bus_gtfs_bundle_dates
      where bundle in ('2024Jan_Prod_r01_b03_Predate_Shuttles_v2_i1_scheduled', '2024April_Prod_r01_b07_Predate_02_Shuttles_2_SCHEDULED')
      --  where pick_year in (2024) 
    )
    , gtfs_calendar_fixed_dates as (
      -- Correct invalid dates like February 31st
      select service_id, monday, tuesday, wednesday, thursday, friday, saturday, sunday, bundle
        , start_date/10000 start_yr, start_date/100 % 100 start_mnth
        -- Take out dates like Feb 31 or Apr 31
        , case 
          when start_date/100 % 100 = 2 and start_date % 100 > 28 then 28 
          when (start_date/10000) % 4 = 0 and start_date % 100 > 29 then 29 -- if it's a leap year, 2/29
          when start_date/100 % 100 in (9, 4, 6, 11) and start_date % 100 > 30 then 30
          else start_date % 100
        end as start_day
        , end_date/10000 end_yr, end_date/100 % 100 end_mnth
        , case
          when end_date/100 % 100 = 2 and end_date % 100 > 28 then 28
          when (end_date/10000) % 4 = 0 and end_date % 100 > 29 then 29 -- if it's a leap year, 2/29
          when end_date/100 % 100 in (9, 4, 6, 11) and end_date % 100 > 30 then 30
          else end_date % 100
        end as end_day
      from mtadatalake.core.fact_gtfs_calendar
      where bundle in (select distinct bundle from bundle_dates)
    )
    , calendar as (
      -- convert start_date to date object
      select service_id, monday, tuesday, wednesday, thursday, friday, saturday, sunday, bundle
        , DATE(cast(start_yr AS VARCHAR) || '-' || CAST(start_mnth AS VARCHAR) || '-' || CAST(start_day AS VARCHAR)) as start_date
        , DATE(CAST(end_yr AS VARCHAR) || '-' || CAST(end_mnth AS VARCHAR) || '-' || CAST(end_day AS VARCHAR)) AS end_date
      from gtfs_calendar_fixed_dates
    )
    , exceptions as (
      SELECT service_id, "date", exception_type, bundle
        , DATE(cast(("date" / 10000) as varchar) || '-' || cast((("date" / 100) % 100) as varchar) || '-' || cast(("date" % 100) as varchar)) AS service_date
        , format_datetime(DATE(cast(("date" / 10000) as varchar) || '-' || cast((("date" / 100) % 100) as varchar) || '-' || cast(("date" % 100) as varchar)), 'EEEE') as day_of_week
      FROM mtadatalake.core.fact_gtfs_calendar_dates
      where bundle in (select distinct bundle from bundle_dates)
    )
    , base_schedule as (
      -- for each day, assemble a row for each service id serving it
      SELECT bd.service_date, bd.day_of_week, c.service_id, c.bundle, e.exception_type
      FROM bundle_dates bd
      inner join calendar c
        ON bd.bundle = c.bundle 
        and bd.service_date BETWEEN c.start_date AND c.end_date
        AND (
          (bd.day_of_week = 'Monday' AND c.monday = 1) OR
          (bd.day_of_week = 'Tuesday' AND c.tuesday = 1) OR
          (bd.day_of_week = 'Wednesday' AND c.wednesday = 1) OR
          (bd.day_of_week = 'Thursday' AND c.thursday = 1) OR
          (bd.day_of_week = 'Friday' AND c.friday = 1) OR
          (bd.day_of_week = 'Saturday' AND c.saturday = 1) OR
          (bd.day_of_week = 'Sunday' AND c.sunday = 1)
          )
      -- Join in dates and service_ids where service was removed (exception_type 2)
      left join (select * from exceptions where exception_type = 2) e
        on c.bundle = e.bundle
        and bd.service_date = e.service_date
        and c.service_id = e.service_id
    )
    , modified_schedules as (
      -- Join in dates and service_ids where service was added (exception_type 1)
      select service_date, day_of_week, service_id, bundle, exception_type
      from base_schedule 
      where exception_type is null or exception_type = 1
      union all
      select service_date, day_of_week, service_id, bundle, exception_type
      from exceptions 
      where exception_type = 1
    )
    , daily_schedules as (
      -- Designate schedule daytype
      select service_date, day_of_week, service_id, bundle
        , CASE 
            WHEN day_of_week(service_date) BETWEEN 1 AND 5 THEN 'Weekday'
            WHEN day_of_week(service_date) = 6 THEN 'Saturday'
            WHEN day_of_week(service_date) = 7 THEN 'Sunday'
            ELSE null
          end as sched_daytype
      from modified_schedules
    )
    , schedules_per_day AS (
        -- Aggregate service_ids for each day to form a "schedule"
        SELECT 
            bundle, service_date, sched_daytype
            , array_join(array_agg(CAST(service_id AS VARCHAR) ORDER BY service_id), ',') AS schedule
        FROM daily_schedules
        GROUP BY bundle, service_date, sched_daytype
    )
    , schedule_variations AS (
      -- Count the occurrences of each unique schedule
      SELECT bundle, service_date, schedule, sched_daytype
        , COUNT(service_date) over (partition by bundle, schedule, sched_daytype) AS sched_var_frequency
        -- Make a name for each unique schedule, e.g. Weekday-1, Saturday-3
        ,  sched_daytype || '-' || CAST(DENSE_RANK() OVER (PARTITION BY bundle, sched_daytype ORDER BY schedule) AS VARCHAR) AS schedule_variation
      FROM schedules_per_day
    )
    , schedule_variations_ranked as ( -- this CTE returns one row per day, with the schedule variation and rank
      select bundle, service_date, schedule, sched_daytype, schedule_variation, sched_var_frequency
        , row_number() over (partition by bundle, sched_daytype order by sched_var_frequency desc, service_date) ranking_by_freq
      from schedule_variations
    )
    , most_representative_day as ( -- this CTE will give a weekday, saturday, and sunday date for each bundle
      -- When ranking_by_freq = 1, that day is the best representation of a weekday, saturday, or sunday schedule
        select bundle, service_date, sched_daytype as day_of_week
        from schedule_variations_ranked
        where ranking_by_freq = 1
    )
    -- decide between schedule_variations_ranked, most_representative_day, or sched_with_service_id
    select * from most_representative_day
    '''
    return trino.frame(representative_day_query)

### load all GTFS files

### GTFS1

In [5]:
gtfs1 = import_bundle('2025June_Prod_r04_b01_PREDATE_SHUTTLES_SCHEDULED')

Loading trips
Open the following URL in browser for the external authentication:
https://trino-route-trino.apps.mtasiprod.eastus.aroapp.io/oauth2/token/initiate/c120148181b7ff92591be1b51a0bbd8beaa8103b68b905038b9f05a497397747
Loading shapes
Loading stops
Loading stop times
GTFS data for 2025June_Prod_r04_b01_PREDATE_SHUTTLES_SCHEDULED successfully loaded


In [7]:
trips1 = gtfs1['trips']
stoptimes1 = gtfs1['stoptimes']
stops1 = gtfs1['stops']

#joining trips to stoptimes on trip_id, then to stops on stop_id to get stop_name
joined1 = pd.merge(pd.merge(trips1, stoptimes1, how = 'outer', on = 'trip_id'), stops1, on = 'stop_id')

#getting only the columns asked for and dropping duplicates
stoplist1 = joined1[['route_id','direction_id','stop_id','stop_name']].drop_duplicates(subset = ['route_id','direction_id','stop_id'])

#sorting by route_id, direction_id and stop_id
sortedstoplist1 = stoplist1.sort_values(by=['route_id','direction_id','stop_id'])

### GTFS 2

In [8]:
gtfs2 = import_bundle('2025March_Prod_r01_b05_SHUTTLES_PREDATE_SCHEDULED')

Loading trips
Loading shapes
Loading stops
Loading stop times
GTFS data for 2025March_Prod_r01_b05_SHUTTLES_PREDATE_SCHEDULED successfully loaded


In [9]:
trips2 = gtfs2['trips']
stoptimes2 = gtfs2['stoptimes']
stops2 = gtfs2['stops']

#joining trips to stoptimes on trip_id, then to stops on stop_id to get stop_name
joined2 = pd.merge(pd.merge(trips2, stoptimes2, how = 'outer', on = 'trip_id'), stops2, on = 'stop_id')

#getting only the columns asked for and dropping duplicates
stoplist2 = joined2[['route_id','direction_id','stop_id','stop_name']].drop_duplicates(subset = ['route_id','direction_id','stop_id'])

#sorting by route_id, direction_id and stop_id
sortedstoplist2 = stoplist2.sort_values(by=['route_id','direction_id','stop_id'])

#### Merging

In [12]:
# Merge on route_id, direction_id, stop_id
merged = pd.merge(sortedstoplist1, sortedstoplist2, how='outer', on=['route_id', 'direction_id','stop_id'], suffixes=('_b1', '_b2'))

# Reorder and rename columns
final = merged[[
    'route_id', 'direction_id',
    'stop_id', 'stop_name_b1',
    'stop_id', 'stop_name_b2'
]]

# Fill missing values with empty string
final = final.fillna('')
final.sort_values(by=['route_id','direction_id'])


final.to_csv("all_stops_combined1.csv", index=False)
