# Notebook 01 — TMDB Ingestion (RAW Layer)
---
## Goal
Build a single point-in-time snapshot of streaming provider catalog availability across 7 markets and 5 providers, for both movies and TV series.
Availability is inferred via the TMDB Discover endpoint filtered by `watch_region`, `with_watch_providers`, and `with_watch_monetization_types=flatrate`.

## Outputs
- `data/raw/providers_lookup.parquet`
- `data/raw/discover_snapshot.parquet`
- `data/raw/titles_metadata_raw.parquet`
- `data/raw/manifest.json`

## Design Notes
- Synchronous HTTP requests with exponential backoff on 429 / 5xx
- Discover fetch supports checkpoint/resume — safe to interrupt and restart
- Metadata fetch supports checkpoint/resume on `(tmdb_id, media_type)` key
- TV runtime estimated via median of `episode_run_time` when available
---

## Imports & Configuration
Edit `MAX_PAGES_PER_QUERY` and `MAX_TITLES_METADATA` to limit the run for debugging.

In [1]:
import os
import time
import json
from dataclasses import dataclass
from typing import Any, Dict, Optional, List, Tuple

import requests
import pandas as pd
from pathlib import Path

SNAPSHOT_DATE = "2026-02-21"

COUNTRIES = ["US", "GB", "IT", "DE", "FR", "ES", "KR"]  # TMDB uses GB for the UK
MEDIA_TYPES = ["movie", "tv"]
MONETIZATION_TYPE = "flatrate"

TARGET_PROVIDER_NAMES = [
    "Netflix",
    "Amazon Prime Video",
    "Disney Plus",
    "Apple TV Plus",
    "Paramount Plus",
]

# Debug knobs — set to None to run full pagination / full metadata pull
MAX_PAGES_PER_QUERY = None
MAX_TITLES_METADATA = None

# Project root detection — works whether running from /notebooks or project root
cwd = Path.cwd()
if cwd.name == "notebooks":
    PROJECT_ROOT = cwd.parent
else:
    PROJECT_ROOT = cwd if (cwd / "notebooks").exists() else cwd

DATA_RAW_DIR = str(PROJECT_ROOT / "data" / "raw")
os.makedirs(DATA_RAW_DIR, exist_ok=True)

DISCOVER_PATH = os.path.join(DATA_RAW_DIR, "discover_snapshot.parquet")

TMDB_API_KEY = os.getenv("TMDB_API_KEY")
if not TMDB_API_KEY:
    raise RuntimeError(
        "Missing TMDB_API_KEY environment variable. "
        "Set it before running and restart your IDE/kernel."
    )

TMDB_BASE_URL = "https://api.themoviedb.org/3"

## TMDB Client
Synchronous HTTP client with API key injection, configurable timeout, and exponential backoff on 429 and 5xx errors.

In [2]:
@dataclass
class TMDBClient:
    api_key: str
    base_url: str = TMDB_BASE_URL
    timeout_s: int = 30
    sleep_between_calls_s: float = 0.1
    max_retries: int = 5

    def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        if params is None:
            params = {}
        params = {**params, "api_key": self.api_key}

        url = f"{self.base_url}{path}"
        last_exc: Optional[Exception] = None

        for attempt in range(1, self.max_retries + 1):
            try:
                r = requests.get(url, params=params, timeout=self.timeout_s)
                time.sleep(self.sleep_between_calls_s)

                if r.status_code == 200:
                    return r.json()

                if r.status_code in (429, 500, 502, 503, 504):
                    wait = (2.0 * attempt) if r.status_code == 429 else (1.5 * attempt)
                    time.sleep(wait)
                    continue

                raise RuntimeError(f"TMDB error {r.status_code}: {r.text[:300]}")
            except Exception as e:
                last_exc = e
                time.sleep(1.0 * attempt)

        raise RuntimeError(f"TMDB request failed after retries: {last_exc}")


client = TMDBClient(api_key=TMDB_API_KEY)

## Provider Lookup
Fetches the full TMDB provider list for movie and TV, then matches target brands using a normalized alias map.
Paramount Plus is intentionally kept as two provider IDs (Essential + Premium) to avoid missing titles.

In [3]:
import re

def fetch_providers(media_type: str) -> pd.DataFrame:
    data = client.get(f"/watch/providers/{media_type}", params={"watch_region": "US"})
    results = data.get("results", [])
    df = pd.DataFrame(results)
    if df.empty:
        return df
    keep = [c for c in ["provider_id", "provider_name", "display_priority", "logo_path"] if c in df.columns]
    df = df[keep].copy()
    df["media_type_source"] = media_type
    return df

def norm_name(s: str) -> str:
    s = (s or "").strip().lower()
    s = s.replace("+", " plus ")
    s = re.sub(r"[^a-z0-9]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

providers_movie = fetch_providers("movie")
providers_tv = fetch_providers("tv")
providers_all = pd.concat([providers_movie, providers_tv], ignore_index=True).drop_duplicates(
    subset=["provider_id", "provider_name"]
)

if providers_all.empty:
    raise RuntimeError("TMDB provider list is empty. Check API key / connectivity.")

providers_all["provider_name_norm"] = providers_all["provider_name"].map(norm_name)

# Exclude add-on channels, plan variants, and third-party storefronts
BLACKLIST_TERMS = [
    "with ads", "standard with ads", "kids",
    "channel", "amazon channel", "roku", "apple tv channel",
    "store", "free with ads",
]
for term in BLACKLIST_TERMS:
    providers_all = providers_all[~providers_all["provider_name_norm"].str.contains(norm_name(term), na=False)]

# Alias map: canonical brand name → TMDB provider name variants
TARGET_PROVIDER_ALIASES = {
    "Netflix": ["netflix"],
    "Amazon Prime Video": ["amazon prime video"],
    "Disney Plus": ["disney plus"],
    "Apple TV Plus": ["apple tv"],  # TMDB uses "Apple TV" for the Apple TV+ service
    "Paramount Plus": ["paramount plus essential", "paramount plus premium"],
}

matched_rows = []
missing_targets = []

for canonical_name, aliases in TARGET_PROVIDER_ALIASES.items():
    alias_norm = [norm_name(a) for a in aliases]
    hits = providers_all[providers_all["provider_name_norm"].isin(alias_norm)].copy()

    if hits.empty:
        missing_targets.append(canonical_name)
        continue

    if canonical_name == "Paramount Plus":
        for _, h in hits.iterrows():
            row = h.copy()
            row["canonical_provider"] = canonical_name
            matched_rows.append(row)
        continue

    if "display_priority" in hits.columns:
        hits = hits.sort_values("display_priority", ascending=True)
    best = hits.iloc[0].copy()
    best["canonical_provider"] = canonical_name
    matched_rows.append(best)

if missing_targets:
    print("Missing target providers:", missing_targets)
    for mt in missing_targets:
        key = norm_name(mt).split()[0]
        candidates = providers_all[providers_all["provider_name_norm"].str.contains(key, na=False)].head(15)
        print(f"\nCandidates for '{mt}' (top 15 containing '{key}'):")
        print(candidates[["provider_id", "provider_name"]].to_string(index=False))
    raise RuntimeError("Some target providers not found. Update TARGET_PROVIDER_ALIASES.")

providers_target = pd.DataFrame(matched_rows)
providers_map = (
    providers_target[["provider_id", "provider_name", "canonical_provider"]]
    .drop_duplicates()
    .sort_values("canonical_provider")
    .reset_index(drop=True)
)

providers_lookup_path = os.path.join(DATA_RAW_DIR, "providers_lookup.parquet")
providers_map.to_parquet(providers_lookup_path, index=False)

print("\nSelected core providers:")
print(providers_map[["canonical_provider", "provider_id", "provider_name"]].to_string(index=False))


Selected core providers:
canonical_provider  provider_id            provider_name
Amazon Prime Video            9       Amazon Prime Video
     Apple TV Plus          350                 Apple TV
       Disney Plus          337              Disney Plus
           Netflix            8                  Netflix
    Paramount Plus         2616 Paramount Plus Essential
    Paramount Plus         2303   Paramount Plus Premium


## Provider Name Debug
Quick lookup to verify TMDB provider names for key brands. Useful when updating `TARGET_PROVIDER_ALIASES`.

In [4]:
for term in ["apple", "paramount", "disney", "prime", "netflix"]:
    subset = providers_all[providers_all["provider_name_norm"].str.contains(term, na=False)]
    print(f"\n--- provider_name contains '{term}' (up to 20) ---")
    print(subset[["provider_id", "provider_name"]].head(20).to_string(index=False))


--- provider_name contains 'apple' (up to 20) ---
 provider_id     provider_name
         350          Apple TV
        2034 Acorn TV Apple TV

--- provider_name contains 'paramount' (up to 20) ---
 provider_id            provider_name
        2616 Paramount Plus Essential
        2303   Paramount Plus Premium

--- provider_name contains 'disney' (up to 20) ---
 provider_id provider_name
         337   Disney Plus
         508     DisneyNOW

--- provider_name contains 'prime' (up to 20) ---
 provider_id      provider_name
           9 Amazon Prime Video

--- provider_name contains 'netflix' (up to 20) ---
 provider_id provider_name
           8       Netflix


## Discover Availability Snapshot
Queries `/discover/movie` and `/discover/tv` for each `(provider, country)` combination filtered to flatrate monetization.
Supports checkpoint/resume — if `discover_snapshot.parquet` already exists the fetch is skipped entirely.

> Runtime: up to ~80 minutes for a full run.

In [5]:
import time

BAR_WIDTH = 40

if os.path.exists(DISCOVER_PATH):
    discover_df = pd.read_parquet(DISCOVER_PATH)
    print(f"Loaded existing discover snapshot: {len(discover_df)} rows — skipping fetch.")

else:
    jobs = []
    for media_type in MEDIA_TYPES:
        for country in COUNTRIES:
            for _, prow in providers_map.iterrows():
                jobs.append({
                    "media_type": media_type,
                    "country": country,
                    "provider_id": int(prow["provider_id"]),
                    "provider_name": str(prow["provider_name"]),
                    "canonical": str(prow["canonical_provider"]),
                })

    print("Probing to estimate total pages across all jobs...")
    probe_cache = {}
    total_pages_all = 0
    for j in jobs:
        path = f"/discover/{j['media_type']}"
        params = {
            "watch_region": j["country"],
            "with_watch_providers": j["provider_id"],
            "with_watch_monetization_types": MONETIZATION_TYPE,
            "page": 1,
        }
        try:
            data = client.get(path, params=params)
            total_pages = int(data.get("total_pages", 1) or 1)
            eff = min(total_pages, MAX_PAGES_PER_QUERY or 500, 500)
            eff = max(1, eff)
            probe_cache[(j['media_type'], j['country'], j['provider_id'])] = {
                "first_page_results": data.get("results", []),
                "effective_total_pages": eff,
            }
            total_pages_all += eff
        except Exception as e:
            print(f"Probe failed for {j['media_type']}|{j['country']}|{j['provider_id']}: {e}")
            probe_cache[(j['media_type'], j['country'], j['provider_id'])] = {
                "first_page_results": [],
                "effective_total_pages": 0,
            }

    if total_pages_all == 0:
        print("No pages to fetch. Aborting discover.")
        discover_df = pd.DataFrame()
    else:
        print(f"Estimated total pages to fetch: {total_pages_all}")

        pages_done = 0
        start_time = time.time()
        rows_acc: List[Dict[str, Any]] = []

        for j in jobs:
            media_type = j["media_type"]
            country = j["country"]
            pid = j["provider_id"]

            cached = probe_cache.get((media_type, country, pid), {})
            effective_total = cached.get("effective_total_pages", 0)
            if effective_total <= 0:
                continue

            for page in range(1, effective_total + 1):
                if page == 1 and cached.get("first_page_results") is not None:
                    results = cached.get("first_page_results", [])
                else:
                    data = client.get(f"/discover/{media_type}", params={
                        "watch_region": country,
                        "with_watch_providers": pid,
                        "with_watch_monetization_types": MONETIZATION_TYPE,
                        "page": page,
                    })
                    results = data.get("results", [])

                for r in results:
                    tid = r.get("id")
                    if tid is None:
                        continue
                    rows_acc.append({
                        "tmdb_id": int(tid),
                        "media_type": media_type,
                        "provider_id": pid,
                        "country": country,
                        "snapshot_date": SNAPSHOT_DATE,
                    })

                pages_done += 1
                proportion = min(1.0, pages_done / float(total_pages_all))
                filled = int(proportion * BAR_WIDTH)
                bar = "=" * filled + "-" * (BAR_WIDTH - filled)
                elapsed = time.time() - start_time
                avg_time_per_page = (elapsed / pages_done) if pages_done > 0 else 0.0
                eta_seconds = max(0, total_pages_all - pages_done) * avg_time_per_page
                print(
                    f"[DISCOVER] [{bar}] {pages_done}/{total_pages_all} pages — elapsed {int(elapsed)}s ETA {eta_seconds/60:.1f}m",
                    end='\r', flush=True,
                )

        print()
        discover_df = pd.DataFrame(rows_acc)

        if not discover_df.empty:
            discover_df = discover_df.drop_duplicates(
                subset=["tmdb_id", "media_type", "provider_id", "country"]
            ).reset_index(drop=True)

        discover_df.to_parquet(DISCOVER_PATH, index=False)
        total_elapsed = time.time() - start_time
        print(f"Discover complete. Rows: {len(discover_df)} | Elapsed: {total_elapsed/60:.1f} min")

availability_df = discover_df
availability_df.head()

Loaded existing discover snapshot: 127399 rows — skipping fetch.


Unnamed: 0,tmdb_id,media_type,provider_id,country,snapshot_date
0,1168190,movie,9,US,2026-02-21
1,1317672,movie,9,US,2026-02-21
2,755898,movie,9,US,2026-02-21
3,1503900,movie,9,US,2026-02-21
4,14874,movie,9,US,2026-02-21


## Unique Titles List
Deduplicate discover results to a unique `(tmdb_id, media_type)` list. Metadata is pulled once per title regardless of how many providers or markets carry it.

In [6]:
if discover_df.empty:
    raise RuntimeError("discover_df is empty. Check TMDB discover filters and provider IDs.")

unique_titles = (
    discover_df[["tmdb_id", "media_type"]]
    .drop_duplicates()
    .sort_values(["media_type", "tmdb_id"])
    .reset_index(drop=True)
)

unique_titles_for_pull = unique_titles.head(MAX_TITLES_METADATA) if MAX_TITLES_METADATA else unique_titles

print("Unique titles total:", len(unique_titles))
print("Unique titles to pull metadata for:", len(unique_titles_for_pull))
unique_titles_for_pull.head()

Unique titles total: 46973
Unique titles to pull metadata for: 46973


Unnamed: 0,tmdb_id,media_type
0,5,movie
1,6,movie
2,11,movie
3,12,movie
4,13,movie


## Metadata Helper Functions
Normalizes movie and TV detail responses into a unified flat schema. TV runtime is estimated via median of `episode_run_time`.

In [7]:
def safe_year(date_str: Optional[str]) -> Optional[int]:
    if not date_str or not isinstance(date_str, str) or len(date_str) < 4:
        return None
    try:
        return int(date_str[:4])
    except Exception:
        return None


def median_or_none(values: Any) -> Optional[int]:
    if values is None:
        return None
    if isinstance(values, list) and len(values) > 0:
        nums = [v for v in values if isinstance(v, (int, float)) and v is not None]
        if not nums:
            return None
        return int(pd.Series(nums).median())
    return None


def normalize_origin_country(data: Dict[str, Any], media_type: str) -> Optional[str]:
    oc = data.get("origin_country")
    if isinstance(oc, list) and len(oc) > 0:
        return oc[0]
    if isinstance(oc, str) and oc:
        return oc
    prod = data.get("production_countries")
    if isinstance(prod, list) and len(prod) > 0 and isinstance(prod[0], dict):
        return prod[0].get("iso_3166_1")
    return None


def normalize_genres(data: Dict[str, Any]) -> Tuple[Optional[str], str]:
    genres = data.get("genres") or []
    if not isinstance(genres, list):
        return None, ""
    names = [g.get("name", "") for g in genres if isinstance(g, dict) and g.get("name")]
    primary = names[0] if names else None
    return primary, "|".join(names)


def fetch_title_details(tmdb_id: int, media_type: str) -> Dict[str, Any]:
    data = client.get(f"/{media_type}/{tmdb_id}", params={})

    if media_type == "movie":
        title = data.get("title")
        date_field = data.get("release_date")
        runtime_min = data.get("runtime")
    else:
        title = data.get("name")
        date_field = data.get("first_air_date")
        runtime_min = median_or_none(data.get("episode_run_time"))

    primary_genre, genres_str = normalize_genres(data)
    origin_country = normalize_origin_country(data, media_type)

    return {
        "tmdb_id": tmdb_id,
        "media_type": media_type,
        "title": title,
        "release_date": date_field,
        "release_year": safe_year(date_field),
        "primary_genre": primary_genre,
        "genres": genres_str,
        "origin_country": origin_country,
        "original_language": data.get("original_language"),
        "vote_average": data.get("vote_average"),
        "vote_count": data.get("vote_count"),
        "popularity": data.get("popularity"),
        "runtime_min": runtime_min,
        "snapshot_date": SNAPSHOT_DATE,
    }

## Title Metadata Pull (Checkpoint + Resume)
Fetches detailed metadata for each unique `(tmdb_id, media_type)`. Saves a checkpoint every `SAVE_EVERY` titles — safe to interrupt and restart.

> Runtime: up to ~6 hours for 46k titles at 0.1s per call.

In [8]:
import time
from pathlib import Path

CHECKPOINT_PATH = os.path.join(DATA_RAW_DIR, "titles_metadata_raw.parquet")
SAVE_EVERY = 500
BAR_WIDTH = 40

if os.path.exists(CHECKPOINT_PATH):
    titles_meta_df = pd.read_parquet(CHECKPOINT_PATH)
    done = set(zip(titles_meta_df["tmdb_id"].astype(int), titles_meta_df["media_type"].astype(str)))
    print(f"Resuming from checkpoint. Already have {len(done)} (tmdb_id, media_type) entries.")
else:
    titles_meta_df = pd.DataFrame()
    done = set()
    print("No checkpoint found. Starting fresh.")

unique_titles = discover_df[["tmdb_id", "media_type"]].drop_duplicates().reset_index(drop=True)
unique_titles["_key"] = list(zip(unique_titles["tmdb_id"].astype(int), unique_titles["media_type"].astype(str)))
unique_titles = unique_titles[~unique_titles["_key"].isin(done)].drop(columns="_key").reset_index(drop=True)

total_to_process = len(unique_titles)
print(f"Titles to process now: {total_to_process}")

start_time = time.time()
processed_count = 0
new_rows = []

for idx, row in unique_titles.iterrows():
    tmdb_id = int(row["tmdb_id"])
    media_type = row["media_type"]

    try:
        meta = fetch_title_details(tmdb_id, media_type)
        new_rows.append(meta)
    except Exception as e:
        print(f"\nERROR on {tmdb_id} ({media_type}): {e}")
        continue

    processed_count += 1

    proportion = min(1.0, processed_count / float(total_to_process))
    filled = int(proportion * BAR_WIDTH)
    bar = "=" * filled + "-" * (BAR_WIDTH - filled)
    elapsed = time.time() - start_time
    avg = elapsed / processed_count if processed_count > 0 else 0
    eta = max(0, total_to_process - processed_count) * avg
    eta_h = int(eta // 3600)
    eta_m = int((eta % 3600) // 60)
    eta_s = int(eta % 60)
    eta_str = f"{eta_h}h {eta_m:02d}m {eta_s:02d}s" if eta_h > 0 else f"{eta_m}m {eta_s:02d}s"

    print(
        f"[METADATA] [{bar}] {processed_count}/{total_to_process} — elapsed {int(elapsed)}s ETA {eta_str}",
        end='\r', flush=True,
    )

    if processed_count % SAVE_EVERY == 0:
        temp_df = pd.DataFrame(new_rows)
        titles_meta_df = pd.concat([titles_meta_df, temp_df], ignore_index=True)
        titles_meta_df.to_parquet(CHECKPOINT_PATH, index=False)
        new_rows = []
        print(f"\nCheckpoint saved at {processed_count} new titles.")

if new_rows:
    temp_df = pd.DataFrame(new_rows)
    titles_meta_df = pd.concat([titles_meta_df, temp_df], ignore_index=True)
    titles_meta_df.to_parquet(CHECKPOINT_PATH, index=False)

total_elapsed = time.time() - start_time
print(f"\nMetadata pull complete.")
print(f"Total titles now stored: {len(titles_meta_df)}")
print(f"Total elapsed time: {total_elapsed/60:.1f} minutes")

Resuming from checkpoint. Already have 46973 (tmdb_id, media_type) entries.
Titles to process now: 0

Metadata pull complete.
Total titles now stored: 46973
Total elapsed time: 0.0 minutes


## RAW Manifest
Lightweight versioning file describing what was ingested and with which parameters. Written to `data/raw/manifest.json`.

In [9]:
manifest = {
    "snapshot_date": SNAPSHOT_DATE,
    "source": "TMDB",
    "countries": COUNTRIES,
    "media_types": MEDIA_TYPES,
    "monetization_filter": MONETIZATION_TYPE,
    "providers": providers_map.to_dict(orient="records"),
    "debug_limits": {
        "max_pages_per_query": MAX_PAGES_PER_QUERY,
        "max_titles_metadata": MAX_TITLES_METADATA,
    },
    "row_counts": {
        "providers_lookup": int(len(providers_target)),
        "discover_snapshot": int(len(discover_df)),
        "titles_metadata_raw": int(len(titles_meta_df)),
    },
    "notes": [
        "Availability inferred via TMDB Discover endpoints filtered by watch providers.",
        "Only flatrate (subscription streaming) availability is included.",
        "TV runtime approximated via median of episode_run_time when available.",
    ],
}

manifest_path = os.path.join(DATA_RAW_DIR, "manifest.json")
with open(manifest_path, "w", encoding="utf-8") as f:
    json.dump(manifest, f, indent=2)

print("Manifest written to:", manifest_path)

Manifest written to: c:\Users\matt\OneDrive\Desktop\Data_Projects\Streaming_Benchmark_Project\data\raw\manifest.json


## Sanity Checks
Quick validation to confirm the snapshot is non-empty and missingness is within expected bounds.

In [10]:
# Distinct titles per provider x country
avail_check = (
    discover_df.groupby(["country", "provider_id"])["tmdb_id"]
    .nunique()
    .reset_index(name="distinct_titles")
    .merge(providers_map[["provider_id", "canonical_provider"]], on="provider_id", how="left")
    .sort_values(["country", "distinct_titles"], ascending=[True, False])
)
print(avail_check.head(20).to_string(index=False))

# Missingness report on raw metadata
missing_report = titles_meta_df.isna().mean().sort_values(ascending=False)
print(missing_report.head(15))

country  provider_id  distinct_titles canonical_provider
     DE            8             8693            Netflix
     DE            9             8010 Amazon Prime Video
     DE          337             3482        Disney Plus
     DE          350              309      Apple TV Plus
     ES            8             9385            Netflix
     ES          337             3519        Disney Plus
     ES          350              309      Apple TV Plus
     FR            8             8644            Netflix
     FR          337             3612        Disney Plus
     FR         2303              877     Paramount Plus
     FR          350              305      Apple TV Plus
     GB            9            13387 Amazon Prime Video
     GB            8             8853            Netflix
     GB          337             4058        Disney Plus
     GB         2303              932     Paramount Plus
     GB          350              311      Apple TV Plus
     IT            8           