In [1]:
!aws login

Attempting to open your default browser.
If the browser does not open, open the following URL:

https://us-east-1.signin.aws.amazon.com/v1/authorize?response_type=code&client_id=arn%3Aaws%3Asignin%3A%3A%3Adevtools%2Fsame-device&state=2d2387cb-b253-45af-a508-8fd58780a877&code_challenge_method=SHA-256&scope=openid&redirect_uri=http%3A%2F%2F127.0.0.1%3A60441%2Foauth%2Fcallback&code_challenge=VXqMAVSV4iEQ028uGtLLBFKY9YNlyndDWPM-vfMjypc

Updated profile default to use arn:aws:sts::549787090008:assumed-role/AWSReservedSSO_mse-tl-dataeng300-EMR_da0cc2e9e5742c69/phf6371 credentials.


In [2]:
import os
import time
import argparse
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

import requests
import pandas as pd
import boto3

import boto3
import requests
import zipfile
import tempfile
from pathlib import Path
from botocore.exceptions import ClientError

In [3]:
session = boto3.Session(region_name='us-east-1')
s3 = session.client('s3')

BUCKET = 'bustrust'
PREFIX = 'datasets'

# Defining S3 keys
zip_key = f'{PREFIX}/ml-1.zip'
extract_prefix = f'{PREFIX}/ml-1m/'

In [4]:
CHICAGO_TZ = ZoneInfo("America/Chicago")


def chunk_list(xs, n=10):
    return [xs[i : i + n] for i in range(0, len(xs), n)]


def get_routes(session: requests.Session, api_key: str):
    url = f"https://www.ctabustracker.com/bustime/api/v3/getroutes?key={api_key}&format=json"
    r = session.get(url, timeout=30)
    r.raise_for_status()
    data = r.json()

    routes = data.get("bustime-response", {}).get("routes", [])
    if not routes:
        err = data.get("bustime-response", {}).get("error", [])
        raise ValueError(f"No routes returned. Error: {err}")

    return [rt["rt"] for rt in routes if "rt" in rt]


def get_api(session: requests.Session, url: str):
    r = session.get(url, timeout=30)
    if r.ok:
        return r.json()
    raise ValueError(f"API request failed (status={r.status_code})")


def append_vehicles_to_csv(data, outfile: str, pulled_at: str, rt_chunk: str) -> int:
    vehicles = data.get("bustime-response", {}).get("vehicle", None)
    if not vehicles:
        return 0

    df = pd.DataFrame(vehicles)
    df["pulled_at"] = pulled_at
    df["rt_chunk"] = rt_chunk

    file_exists = os.path.exists(outfile)
    df.to_csv(outfile, mode="a", header=not file_exists, index=False)
    return len(df)


def _current_outfile(out_dir: str) -> str:
    return os.path.join(out_dir, "bus_data_current_chicago.csv")


def _s3_key_for_chunk(chunk_start_dt: datetime, chunk_hours: float) -> str:
    chunk_end_dt = chunk_start_dt + timedelta(hours=chunk_hours)
    start_stamp = chunk_start_dt.strftime("%Y-%m-%d_%H-%M-%S")
    end_stamp = chunk_end_dt.strftime("%Y-%m-%d_%H-%M-%S")
    filename = f"bus_data_{start_stamp}_to_{end_stamp}_chicago.csv"
    return os.path.join("data_collection", filename)


def _upload_file_to_s3_with_key(s3, bucket_name: str, local_path: str, s3_key: str) -> str:
    s3.upload_file(local_path, bucket_name, s3_key)
    return s3_key


def main(
    api_key: str,
    runtime_seconds: int,
    per_chunk_sleep: int = 5,
    per_sweep_sleep: int = 30,
    out_dir: str = "data",
    chunk_hours: float = 6.0,
    s3_bucket: str = "bustrust",
    no_s3_upload: bool = False,
):
    if chunk_hours <= 0:
        raise ValueError("chunk_hours must be > 0 (e.g., 6).")
    if runtime_seconds <= 0:
        raise ValueError("runtime_seconds must be > 0.")

    os.makedirs(out_dir, exist_ok=True)

    session = requests.Session()
    routes = get_routes(session, api_key)
    route_chunks = chunk_list(routes, n=10)

    start_ts = time.time()
    end_ts = start_ts + runtime_seconds

    chunk_seconds = int(chunk_hours * 3600)

    # Chunk schedule (fixed relative to start)
    chunk_start_ts = start_ts
    chunk_start_dt = datetime.now(CHICAGO_TZ)  # used for S3 filenames
    next_rollover_ts = chunk_start_ts + chunk_seconds

    # Single local file
    outfile = _current_outfile(out_dir)

    s3_client = None
    if not no_s3_upload:
        s3_client = boto3.client("s3")

    print(f"Found {len(routes)} routes -> {len(route_chunks)} route-chunks")
    print(f"Local output (single file): {os.path.abspath(outfile)}")
    print(f"Chunk: {chunk_hours} hours ({chunk_seconds} seconds)")
    print(f"Sleep: per_chunk={per_chunk_sleep}s | per_sweep={per_sweep_sleep}s | runtime={runtime_seconds}s")
    print("S3 upload:", "DISABLED" if no_s3_upload else f"ENABLED -> s3://{s3_bucket}/data_collection/")
    print("Upload policy: ONLY at chunk boundary; delete local file after successful upload.")
    print("If upload fails: keep writing into the SAME local file; do NOT advance chunk window.")

    sweep_num = 0
    call_num = 0
    total_rows = 0

    def rollover_if_needed():
        """
        The ONLY place an upload can happen.
        If we crossed the chunk boundary, we upload the single local file to a timestamped S3 key.
        - On success: delete local file and advance chunk window by exactly one chunk.
        - On failure: keep local file; do NOT advance the chunk window (prevents mixing windows).
        """
        nonlocal chunk_start_ts, chunk_start_dt, next_rollover_ts

        now_ts = time.time()
        if now_ts < next_rollover_ts:
            return

        # If there's nothing written yet, still advance the window (optional behavior).
        # Here, we only try to upload if the file exists and has non-zero size.
        has_data = os.path.exists(outfile) and os.path.getsize(outfile) > 0

        if not no_s3_upload and s3_client is not None and has_data:
            s3_key = _s3_key_for_chunk(chunk_start_dt, chunk_hours)
            try:
                _upload_file_to_s3_with_key(s3_client, s3_bucket, outfile, s3_key)
                print(f"  [S3] uploaded -> s3://{s3_bucket}/{s3_key}")

                # Delete local file so we never keep multiple files.
                os.remove(outfile)
                print(f"  [LOCAL] deleted -> {outfile}")

            except Exception as e:
                print(f"  [S3] upload ERROR (continuing with same local file): {e}")
                # Do NOT advance chunk window if upload failed.
                return

        # Advance chunk window deterministically.
        # If we've fallen behind multiple boundaries, advance one at a time.
        # (We don't "catch up" multiple windows because we keep a single file.)
        chunk_start_ts = next_rollover_ts
        chunk_start_dt = chunk_start_dt + timedelta(hours=chunk_hours)
        next_rollover_ts = chunk_start_ts + chunk_seconds
        print(f"--- Rolled over chunk window. Continuing in same local file: {os.path.basename(outfile)}")

    while time.time() < end_ts:
        rollover_if_needed()

        sweep_num += 1
        now_label = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d %H:%M:%S %Z")
        print(f"--- Sweep {sweep_num} @ {now_label} ---")

        for i, chunk in enumerate(route_chunks):
            if time.time() >= end_ts:
                break

            rollover_if_needed()

            rt_param = ",".join(chunk)
            url = (
                "https://www.ctabustracker.com/bustime/api/v3/getvehicles"
                f"?key={api_key}&rt={rt_param}&format=json"
            )
            pulled_at = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d %H:%M:%S %Z")

            try:
                data = get_api(session, url)
                n_rows = append_vehicles_to_csv(data, outfile, pulled_at=pulled_at, rt_chunk=rt_param)
                call_num += 1
                total_rows += n_rows
                print(
                    f"[Call {call_num}] routeset {i+1}/{len(route_chunks)}: +{n_rows} rows "
                    f"(total {total_rows}) -> {os.path.basename(outfile)}"
                )
            except Exception as e:
                call_num += 1
                print(f"[Call {call_num}] routeset {i+1}/{len(route_chunks)} ERROR: {e}")

            if time.time() < end_ts:
                time.sleep(min(per_chunk_sleep, max(0, end_ts - time.time())))

        if time.time() < end_ts:
            sleep_now = min(per_sweep_sleep, max(0, end_ts - time.time()))
            print(f"--- Sweep {sweep_num} complete. Sleeping {sleep_now:.0f}s ---")
            time.sleep(sleep_now)

    # ALWAYS upload whatever remains ONCE at shutdown (unless S3 disabled)
    if (not no_s3_upload) and (s3_client is not None):
        if os.path.exists(outfile) and os.path.getsize(outfile) > 0:
            s3_key = _s3_key_for_chunk(chunk_start_dt, chunk_hours)
            try:
                _upload_file_to_s3_with_key(s3_client, s3_bucket, outfile, s3_key)
                print(f"  [S3] uploaded (exit) -> s3://{s3_bucket}/{s3_key}")
                os.remove(outfile)
                print(f"  [LOCAL] deleted (exit) -> {outfile}")
            except Exception as e:
                print(f"  [S3] upload ERROR on exit (file kept): {e}")

    print(f"\nDone. Sweeps: {sweep_num}, calls: {call_num}, total rows written: {total_rows}")

In [6]:
main(
        api_key='NPeYcrgS6Pt432G5F64u8jgQD',
        runtime_seconds=39600,
        per_chunk_sleep=5,
        per_sweep_sleep=30,
        out_dir="data/test3",
        chunk_hours=6,
        s3_bucket='bustrust',
        no_s3_upload=False,
    )

Found 124 routes -> 13 route-chunks
Local output (single file): /Users/pedrohenriquefarina/CTABusTrust/CTABusTrust/data/test3/bus_data_current_chicago.csv
Chunk: 6 hours (21600 seconds)
Sleep: per_chunk=5s | per_sweep=30s | runtime=39600s
S3 upload: ENABLED -> s3://bustrust/data_collection/
Upload policy: ONLY at chunk boundary; delete local file after successful upload.
If upload fails: keep writing into the SAME local file; do NOT advance chunk window.
--- Sweep 1 @ 2026-02-19 23:41:14 CST ---
[Call 1] routeset 1/13: +34 rows (total 34) -> bus_data_current_chicago.csv
[Call 2] routeset 2/13: +27 rows (total 61) -> bus_data_current_chicago.csv
[Call 3] routeset 3/13: +29 rows (total 90) -> bus_data_current_chicago.csv
[Call 4] routeset 4/13: +18 rows (total 108) -> bus_data_current_chicago.csv
[Call 5] routeset 5/13: +27 rows (total 135) -> bus_data_current_chicago.csv
[Call 6] routeset 6/13: +19 rows (total 154) -> bus_data_current_chicago.csv
[Call 7] routeset 7/13: +39 rows (total 