# 01 · Extract & Transform — Toronto 311 (Customer-Initiated)
Auto-discover yearly resources via CKAN, pull 2021–2025 YTD, clean, and export tidy CSVs.
**Defaults:** exclude `Canceled` statuses for demand analysis.

In [None]:
import os, io, re, requests, pandas as pd
from datetime import date

BASE = "https://ckan0.cf.opendata.inter.prod-toronto.ca"
PKG_ID = "311-service-requests-customer-initiated"
WANT_YEARS = set(range(2021, 2026))  # 2021–2025 inclusive
EXPORT_DIR = "export"
os.makedirs(EXPORT_DIR, exist_ok=True)

def get_package(pkg_id: str):
    res = requests.get(f"{BASE}/api/3/action/package_show", params={"id": pkg_id})
    res.raise_for_status()
    j = res.json()
    assert j.get("success"), j
    return j["result"]

def ds_sql(sql: str) -> pd.DataFrame:
    res = requests.get(f"{BASE}/api/3/action/datastore_search_sql", params={"sql": sql})
    res.raise_for_status()
    j = res.json()
    assert j.get("success"), j
    return pd.DataFrame(j["result"]["records"])

def year_counts_datastore(res_id: str) -> pd.DataFrame:
    sql = f'''
    SELECT
      date_part('year', "Service Request Creation Date and Time")::int AS year,
      "Original Service Request Type" AS type,
      "Service Request Division" AS division,
      "Service Request Ward" AS ward,
      COUNT(*) AS n
    FROM "{res_id}"
    WHERE COALESCE("Service Request Status",'') <> 'Canceled'
    GROUP BY 1,2,3,4
    '''
    return ds_sql(sql)

def daily_totals_datastore(res_id: str) -> pd.DataFrame:
    sql = f'''
    SELECT
      date_trunc('day', "Service Request Creation Date and Time")::date AS day,
      COUNT(*) AS n
    FROM "{res_id}"
    WHERE COALESCE("Service Request Status",'') <> 'Canceled'
    GROUP BY 1
    ORDER BY 1
    '''
    return ds_sql(sql)

def year_counts_csv(res_url: str) -> pd.DataFrame:
    raw = requests.get(res_url).content
    df = pd.read_csv(io.BytesIO(raw))
    # Normalize columns
    ren = {c: c.strip() for c in df.columns}
    df = df.rename(columns=ren)
    # Parse date
    dt = pd.to_datetime(df["Service Request Creation Date and Time"], errors="coerce", utc=True)
    df = df.assign(dt=dt).dropna(subset=["dt", "Original Service Request Type"])
    df = df[df["Service Request Status"].fillna("") != "Canceled"]
    g = (df.assign(year=df["dt"].dt.year)
            .groupby(["year","Original Service Request Type","Service Request Division","Service Request Ward"])
            .size().reset_index(name="n"))
    g.columns = ["year","type","division","ward","n"]
    return g

def daily_totals_csv(res_url: str) -> pd.DataFrame:
    raw = requests.get(res_url).content
    df = pd.read_csv(io.BytesIO(raw))
    ren = {c: c.strip() for c in df.columns}
    df = df.rename(columns=ren)
    dt = pd.to_datetime(df["Service Request Creation Date and Time"], errors="coerce", utc=True)
    df = df.assign(day=dt.dt.date)
    df = df[df["Service Request Status"].fillna("") != "Canceled"]
    g = df.groupby("day").size().reset_index(name="n")
    return g

# Discover yearly resources
pkg = get_package(PKG_ID)
YEAR_RX = re.compile(r"(20\d{2})")
year_to_res = {}
for r in pkg["resources"]:
    name = f"{r.get('name','')} {r.get('description','')}"
    m = YEAR_RX.search(name)
    if m:
        y = int(m.group(1))
        if y in WANT_YEARS:
            year_to_res[y] = r

print("Found years:", sorted(year_to_res.keys()))

# Pull aggregates
yearly_frames, daily_frames = [], []
for y, r in sorted(year_to_res.items()):
    if r.get("datastore_active"):
        yearly_frames.append(year_counts_datastore(r["id"]))
        daily_frames.append(daily_totals_datastore(r["id"]))
    else:
        yearly_frames.append(year_counts_csv(r["url"]))
        daily_frames.append(daily_totals_csv(r["url"]))

df_counts = (pd.concat(yearly_frames, ignore_index=True)
               .astype({"year": int, "n": int})
               .sort_values(["year","n"], ascending=[True, False]))

daily = (pd.concat(daily_frames, ignore_index=True)
           .dropna(subset=["day"])
           .groupby("day", as_index=False)["n"].sum()
           .sort_values("day"))

# Top N complaint types per year
TOP_N = 15
top_by_year = (df_counts.groupby(["year","type"], as_index=False)["n"].sum()
                        .sort_values(["year","n"], ascending=[True, False])
                        .groupby("year").head(TOP_N))

# Export
df_counts.to_csv(os.path.join(EXPORT_DIR, "311_counts_year_type_division_ward_2021_2025.csv"), index=False)
top_by_year.to_csv(os.path.join(EXPORT_DIR, "311_top_types_by_year_2021_2025.csv"), index=False)
daily.to_csv(os.path.join(EXPORT_DIR, "311_daily_totals_2021_2025.csv"), index=False)

print("Exports written to:", EXPORT_DIR)
