# MBTA API Data Exploration 🚆

This notebook was created as the **exploration and prototyping stage** of the *MBTA Airflow – Snowflake Data Pipeline* project.

The purpose of this notebook is to:
- Connect to the **MBTA public API**.
- Explore and understand the **data structure** and response format.
- Select and clean the key features required for the data pipeline.
- Prototype transformations using **Pandas** before automating them in **Apache Airflow**.

Once validated, this logic was later modularized into Python functions within the Airflow DAG.


In [3]:
import requests
import pandas as pd
import json

In [1]:
pip install requests pandas

Note: you may need to restart the kernel to use updated packages.


In [5]:
import os, requests, pandas as pd

MBTA_API_KEY = os.getenv("MBTA_API_KEY") or "58482005259e487c8cede0d0f9ab6b6e"

url = (
    "https://api-v3.mbta.com/predictions"
    "?filter[route]=Green-B"            # try Red, Orange, Blue, 1 (bus), Green-C/D/E
    "&sort=departure_time"
    "&page[limit]=5"
    "&include=stop,route"
)

r = requests.get(url, headers={"x-api-key": MBTA_API_KEY})
r.raise_for_status()
j = r.json()

# Flatten a few useful fields
rows = []
for item in j.get("data", []):
    a = item["attributes"]
    rows.append({
        "route": a.get("route", None),
        "direction": a.get("direction_id"),
        "status": a.get("status"),
        "arrival_time": a.get("arrival_time"),
        "departure_time": a.get("departure_time"),
        "stop_seq": a.get("stop_sequence"),
    })

df = pd.DataFrame(rows)
print(df if not df.empty else "No rows returned. Try a different route (e.g., Red or 1).")


  route  direction status               arrival_time departure_time  stop_seq
0  None          0   None  2025-10-07T09:54:16-04:00           None       310
1  None          1   None  2025-10-07T09:08:33-04:00           None       610
2  None          0   None  2025-10-07T10:05:39-04:00           None       310
3  None          1   None  2025-10-07T09:19:56-04:00           None       610
4  None          1   None  2025-10-07T09:25:54-04:00           None       610


In [9]:
!pip install snowflake-connector-python

Collecting snowflake-connector-python
  Downloading snowflake_connector_python-3.18.0-cp312-cp312-win_amd64.whl.metadata (76 kB)
     ---------------------------------------- 0.0/76.3 kB ? eta -:--:--
     --------------- ---------------------- 30.7/76.3 kB 435.7 kB/s eta 0:00:01
     -------------------------------------- 76.3/76.3 kB 847.5 kB/s eta 0:00:00
Collecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting boto3>=1.24 (from snowflake-connector-python)
  Downloading boto3-1.40.46-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore>=1.24 (from snowflake-connector-python)
  Downloading botocore-1.40.46-py3-none-any.whl.metadata (5.7 kB)
Collecting s3transfer<0.15.0,>=0.14.0 (from boto3>=1.24->snowflake-connector-python)
  Downloading s3transfer-0.14.0-py3-none-any.whl.metadata (1.7 kB)
Downloading snowflake_connector_python-3.18.0-cp312-cp312-win_amd64.whl (1.2 MB)
   ------------------

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
aiobotocore 2.12.3 requires botocore<1.34.70,>=1.34.41, but you have botocore 1.40.46 which is incompatible.


In [15]:
import snowflake.connector
import pandas as pd

# --- your MBTA dataframe from earlier ---
# df  (already created from the API)

# --- connect to Snowflake ---
conn = snowflake.connector.connect(
    user="RITHIKA0311",  # same as the login in Snowflake
    password="Charliembta@12345",
    account="vrc94697.us-east-1",  # from your Snowflake URL
    warehouse="COMPUTE_WH",
    database="SNOWFLAKE_LEARNING_DB",
    schema="PUBLIC"
)

cur = conn.cursor()

# --- insert dataframe rows into Snowflake ---
for _, row in df.iterrows():
    cur.execute("""
        INSERT INTO MBTA_LIVE_PREDICTIONS
        (ROUTE, DIRECTION, STATUS, ARRIVAL_TIME, DEPARTURE_TIME, STOP_SEQ)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, (
        "Green-B",                             # or whichever route you fetched
        int(row["direction"]) if pd.notna(row["direction"]) else None,
        row["status"],
        row["arrival_time"],
        row["departure_time"],
        int(row["stop_seq"]) if pd.notna(row["stop_seq"]) else None
    ))

conn.commit()
print("✅ Data inserted successfully!")

cur.close()
conn.close()


✅ Data inserted successfully!


In [17]:
pip install snowflake-connector-python requests pandas

Note: you may need to restart the kernel to use updated packages.


In [21]:
url = "https://api-v3.mbta.com/predictions"
params = {
    "filter[route]": "Green-B",
    "sort": "departure_time",
    "page[limit]": 10,
    "include": "stop,route"
}

response = requests.get(url, params=params)
print(response.status_code)

if response.status_code != 200:
    print(response.text)
else:
    data = response.json()
    print("✅ Success:", len(data.get("data", [])), "records fetched.")


200
✅ Success: 10 records fetched.


In [23]:
params = {
    "api_key": MBTA_API_KEY,
    "filter[route]": "Green-B",
    "sort": "departure_time",
    "page[limit]": 10,
    "include": "stop,route"
}


In [25]:
import requests
import pandas as pd
import snowflake.connector
from datetime import datetime

# =======================
# 1️⃣ MBTA API Setup
# =======================
url = "https://api-v3.mbta.com/predictions"
params = {
    "filter[route]": "Green-B",   # you can also try "Red", "Orange", or bus routes like "1"
    "sort": "departure_time",
    "page[limit]": 10,
    "include": "stop,route"
}

response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()

rows = []
for item in data.get("data", []):
    attr = item["attributes"]
    rows.append({
        "route": "Green-B",
        "direction": attr.get("direction_id"),
        "status": attr.get("status"),
        "arrival_time": attr.get("arrival_time"),
        "departure_time": attr.get("departure_time"),
        "stop_seq": attr.get("stop_sequence")
    })

df = pd.DataFrame(rows)
print(f"✅ {len(df)} MBTA records fetched at {datetime.now()}")

# =======================
# 2️⃣ Snowflake Connection
# =======================
conn = snowflake.connector.connect(
    user="RITHIKA0311",
    password="Charliembta@12345",
    account="vrc94697.us-east-1",
    warehouse="COMPUTE_WH",
    database="SNOWFLAKE_LEARNING_DB",
    schema="PUBLIC"
)
cur = conn.cursor()

# =======================
# 3️⃣ Insert into Snowflake
# =======================
for _, row in df.iterrows():
    cur.execute("""
        INSERT INTO MBTA_LIVE_PREDICTIONS
        (ROUTE, DIRECTION, STATUS, ARRIVAL_TIME, DEPARTURE_TIME, STOP_SEQ)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, (
        row["route"],
        int(row["direction"]) if pd.notna(row["direction"]) else None,
        row["status"],
        row["arrival_time"],
        row["departure_time"],
        int(row["stop_seq"]) if pd.notna(row["stop_seq"]) else None
    ))

conn.commit()
cur.close()
conn.close()
print("✅ Data successfully inserted into Snowflake!")


✅ 10 MBTA records fetched at 2025-10-07 10:15:25.562812
✅ Data successfully inserted into Snowflake!


## Summary and Next Steps

✅ Successfully fetched live MBTA train prediction data (Green-B line).  
✅ Cleaned and structured the data using Pandas.  
✅ Exported results into a CSV file (`/tmp/mbta_data.csv`).

**Next Step:**  
The logic prototyped here was converted into modular Python functions inside the Airflow DAG:
- `fetch_mbta_data()` for API extraction and CSV creation.
- `load_to_snowflake()` for database insertion and automation.

This notebook demonstrates the exploration phase before moving to a fully orchestrated pipeline.
