In [1]:
import csv
import requests
import zipfile
import io
from datetime import datetime
from sqlalchemy import create_engine, text
from nyct_gtfs import NYCTFeed
from nyct_gtfs.gtfs_static_types import Stations

In [2]:
DB_USER = "neondb_owner"
DB_PASSWORD = "npg_J73HnAiwErpq"
DB_HOST = "ep-spring-truth-ae312q45-pooler.c-2.us-east-2.aws.neon.tech"
DB_PORT = "5432"
DB_NAME = "neondb"

def log_with_time(message):
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] {message}")

In [14]:
def process_trains(train_list, direction):
    results = []
    log_with_time(f"Processing {len(train_list)} {direction}bound trains...")
    for i, train in enumerate(train_list):
        print(train.trip_id)
        trip_id = train.trip_id
        stop_id = train.location
        current_time = train.last_position_update

        matching_schedule = schedule_index.get((trip_id, stop_id))
        if matching_schedule:
            try:
                scheduled_time = datetime.strptime(matching_schedule["arrival_time"], "%H:%M:%S").time()
                scheduled_dt = datetime.combine(current_time.date(), scheduled_time)
                delay = (current_time - scheduled_dt).total_seconds() / 60.0
                station_name = stations.get_station_name(stop_id)
                status = "on_time" if abs(delay) < 1 else "delayed" if delay > 0 else "early"

                results.append({
                    "trip_id": trip_id,
                    "stop_id": stop_id,
                    "station_name": station_name,
                    "timestamp": current_time,
                    "delay_min": round(delay, 2),
                    "status": status,
                    "direction": direction,
                    "stop_sequence": int(matching_schedule.get("stop_sequence", 0)),
                    "rush_hour": "morning" if 7 <= current_time.hour < 10 else "evening" if 16 <= current_time.hour < 19 else None
                })
            except Exception as e:
                log_with_time(f"ERROR processing train {trip_id}: {e}")
        else:
            log_with_time(f"No schedule match for trip_id={trip_id}, stop_id={stop_id}")

        if (i + 1) % 5 == 0:
            log_with_time(f"Processed {i + 1}/{len(train_list)} {direction}bound trains")
    return results

In [15]:
log_with_time("Step 1: Fetching realtime MTA data...")
feed = NYCTFeed("N")
trains = feed.filter_trips(line_id=['N'], underway=True)
northbound_trains = [t for t in trains if t.direction == 'N']
southbound_trains = [t for t in trains if t.direction == 'S']   
log_with_time(f"Fetched {len(northbound_trains)} northbound and {len(southbound_trains)} southbound trains")

if not northbound_trains and not southbound_trains:
    log_with_time("No trains currently underway - exiting early")

log_with_time("Step 2: Downloading GTFS static data...")
url = "https://rrgtfsfeeds.s3.amazonaws.com/gtfs_supplemented.zip"
response = requests.get(url, timeout=30)
response.raise_for_status()
log_with_time(f"Downloaded GTFS ZIP ({len(response.content):,} bytes)")

log_with_time("Step 3: Extracting and parsing stop_times.txt...")
with zipfile.ZipFile(io.BytesIO(response.content)) as zip_file:
    stop_times_file_path = next((f for f in zip_file.namelist() if f.endswith('stop_times.txt')), None)
    if not stop_times_file_path:
        raise FileNotFoundError("stop_times.txt not found")
    log_with_time(f"Found stop_times.txt at {stop_times_file_path}")

    with zip_file.open(stop_times_file_path) as stop_times_file:
        csv_content = stop_times_file.read().decode('utf-8')
        csv_reader = csv.DictReader(io.StringIO(csv_content))
        schedule_index = {}
        for i, row in enumerate(csv_reader):
            trip_id = row.get("trip_id")
            stop_id = row.get("stop_id")
            if trip_id and stop_id:
                schedule_index[(trip_id, stop_id)] = row
            if i > 0 and i % 200000 == 0:
                log_with_time(f"Indexed {i:,} rows from stop_times.txt...")
        log_with_time(f"Finished indexing stop_times.txt with {len(schedule_index):,} entries")

stations = Stations()

log_with_time("Step 4: Calculating delays...")
all_results = process_trains(northbound_trains, "N") + process_trains(southbound_trains, "S")
log_with_time(f"Total delay records computed: {len(all_results):,}")

if not all_results:
    log_with_time("No delay data found - exiting")
all_results

[2025-07-28 17:43:14.464] Step 1: Fetching realtime MTA data...
[2025-07-28 17:43:14.616] Fetched 9 northbound and 10 southbound trains
[2025-07-28 17:43:14.616] Step 2: Downloading GTFS static data...
[2025-07-28 17:43:15.250] Downloaded GTFS ZIP (19,204,055 bytes)
[2025-07-28 17:43:15.250] Step 3: Extracting and parsing stop_times.txt...
[2025-07-28 17:43:15.251] Found stop_times.txt at stop_times.txt
[2025-07-28 17:43:16.588] Indexed 200,000 rows from stop_times.txt...
[2025-07-28 17:43:17.099] Indexed 400,000 rows from stop_times.txt...
[2025-07-28 17:43:17.647] Indexed 600,000 rows from stop_times.txt...
[2025-07-28 17:43:18.200] Indexed 800,000 rows from stop_times.txt...
[2025-07-28 17:43:18.729] Indexed 1,000,000 rows from stop_times.txt...
[2025-07-28 17:43:19.270] Indexed 1,200,000 rows from stop_times.txt...
[2025-07-28 17:43:19.847] Indexed 1,400,000 rows from stop_times.txt...
[2025-07-28 17:43:20.383] Indexed 1,600,000 rows from stop_times.txt...
[2025-07-28 17:43:20.909]

[]

In [None]:

log_with_time("Step 5: Connecting to database...")
engine = create_engine(f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
with engine.connect() as conn:
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS train_delays (
            id SERIAL PRIMARY KEY,
            trip_id TEXT,
            stop_id TEXT,
            station_name TEXT,
            timestamp TIMESTAMP,
            delay_min REAL,
            status TEXT,
            direction TEXT,
            stop_sequence INTEGER,
            rush_hour TEXT
        )
    """))
    log_with_time("Database table verified")

    insert_sql = text("""
        INSERT INTO train_delays (
            trip_id, stop_id, station_name, timestamp,
            delay_min, status, direction, stop_sequence, rush_hour
        ) VALUES (
            :trip_id, :stop_id, :station_name, :timestamp,
            :delay_min, :status, :direction, :stop_sequence, :rush_hour
        )
    """)

    batch_size = 500
    for i in range(0, len(all_results), batch_size):
        batch = all_results[i:i + batch_size]
        try:
            conn.execute(insert_sql, batch)
            log_with_time(f"Inserted batch {(i // batch_size) + 1} of {len(batch)} records")
        except Exception as e:
            log_with_time(f"Insert error in batch {(i // batch_size) + 1}: {e}")
    conn.commit()
    log_with_time("All batches committed")