In [1]:
#Install + Imports (run only once per environment)
%pip install aiohttp nest_asyncio pandas pyarrow python-dotenv tqdm

import os, asyncio, random, json, time, math
from pathlib import Path
from typing import List, Dict, Any
import nest_asyncio, aiohttp
import pandas as pd
from tqdm import tqdm
nest_asyncio.apply()

Note: you may need to restart the kernel to use updated packages.


In [2]:
# Config & Paths
from dotenv import load_dotenv
# load_dotenv()

# TMDB_TOKEN       = os.environ["TMDB_TOKEN"]
TMDB_TOKEN = "<YOUR_TOKEN>"
CONCURRENCY = 4
#CONCURRENCY      = int(os.getenv("TMDB_CONCURRENCY", 4))  # max 4 for safety
BASE             = "https://api.themoviedb.org/3"
HEADERS          = {"Authorization": f"Bearer {TMDB_TOKEN}", "accept": "application/json"}

RAW_DIR   = Path("data/raw");       
RAW_DIR.mkdir(parents=True, exist_ok=True)
PROC_DIR  = Path("data/processed"); 
PROC_DIR.mkdir(parents=True, exist_ok=True)

SAMPLE_SIZE = 19990

In [3]:
# HTTP helpers with retry + rate-limit guard
class TMDBClient:
    def __init__(self, session, sem):
        self.s = session
        self.sem = sem

    async def get(self, url, params=None, max_retries=3):
        for attempt in range(max_retries):
            async with self.sem:                       # enforce global QPS
                async with self.s.get(url, headers=HEADERS, params=params) as r:
                    if r.status == 429:                # TMDB rate-limit hit
                        retry_after = int(r.headers.get("Retry-After", 2))
                        await asyncio.sleep(retry_after + 0.5)
                        continue
                    if r.status >= 500:
                        await asyncio.sleep(2 ** attempt)
                        continue
                    r.raise_for_status()
                    return await r.json()
        raise RuntimeError(f"Failed GET {url} after {max_retries} retries")


In [4]:
# Iterator over discover/movie to collect IDs
async def discover_ids(year: int, client: TMDBClient) -> List[int]:
    ids, page = [], 1
    while page <= 500:               # TMDB hard cap = 500 pages
        params = {
            "primary_release_year": year,
            "page": page,
            "sort_by": "popularity.asc",  # any sort works
        }
        data = await client.get(f"{BASE}/discover/movie", params)
        ids.extend([m["id"] for m in data["results"]])
        if page >= data["total_pages"]:
            break
        page += 1
    return ids


In [5]:
# Collect all movie IDs across 1900-current
async def collect_all_ids(start=1900, end=2025):
    async with aiohttp.ClientSession() as session:
        sem = asyncio.Semaphore(CONCURRENCY)
        client = TMDBClient(session, sem)
        tasks = [discover_ids(y, client) for y in range(start, end + 1)]
        all_years = []
        for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="years"):
            all_years.extend(await coro)
    uniq_ids = sorted(set(all_years))
    print(f"Collected {len(uniq_ids):,} unique movie IDs")
    # Replace the failing line
    pd.DataFrame({"movie_id": uniq_ids}).to_parquet(
        PROC_DIR / "all_movie_ids.parquet"
    )
    #pd.Series(uniq_ids, name="movie_id").to_parquet(PROC_DIR / "all_movie_ids.parquet")
    return uniq_ids

all_ids = asyncio.run(collect_all_ids(start=2024))


years: 100%|██████████████████████████████████████| 2/2 [00:11<00:00,  5.90s/it]

Collected 19,995 unique movie IDs





In [6]:
# Sampling
if SAMPLE_SIZE > len(all_ids):
    sample_ids = all_ids
else
    sample_ids = random.sample(all_ids, SAMPLE_SIZE)
#pd.Series(sample_ids, name="movie_id").to_parquet(PROC_DIR / "sample_ids.parquet")
pd.DataFrame({"movie_id": sample_ids}).to_parquet(PROC_DIR / "sample_ids.parquet")
print("Sample saved:", len(sample_ids))


Sample saved: 19990


In [7]:
# Fetch full details (+keywords+providers) for one ID
async def fetch_one(mid: int, client: TMDBClient) -> Dict[str, Any]:
    try:
        return await client.get(
            f"{BASE}/movie/{mid}",
            params={"language": "en-US", "append_to_response": "keywords,watch/providers"}
        )
    except Exception as e:
        return {"id": mid, "__error__": str(e)}


In [8]:
# Bulk fetch with asyncio

async def fetch_many(ids: List[int], batch_name="movies"):
    async with aiohttp.ClientSession() as session:
        sem = asyncio.Semaphore(CONCURRENCY)
        client = TMDBClient(session, sem)
        tasks = [fetch_one(mid, client) for mid in ids]
        
        out = []
        for fut in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc=batch_name):
            out.append(await fut)
        return out

raw_movies = asyncio.run(fetch_many(sample_ids))
# Persist raw JSONL
with open(RAW_DIR / "movies_sample.jsonl", "w") as fp:
    for item in raw_movies:
        fp.write(json.dumps(item, ensure_ascii=False) + "\n")


movies: 100%|█████████████████████████████| 19990/19990 [38:41<00:00,  8.61it/s]


In [9]:
# Normalize into clean tables
def explode_movies(raw: List[Dict[str, Any]]):
    records_basic, records_kw, records_prov = [], [], []
    for doc in raw:
        if "__error__" in doc:              # skip or log failures
            continue
        mid = doc["id"]
        # --- keywords block -------------------------------------------------
        kw_objs = doc.get("keywords", {}).get("keywords", [])
        kw_names = [kw["name"] for kw in kw_objs]
        for kw in kw_objs:
            records_kw.append({"movie_id": mid, "keyword": kw["name"]})

        # --- providers block ------------------------------------------------
        prov_results = doc.get("watch/providers", {}).get("results", {})
        prov_name_set = set()
        for country, blob in prov_results.items():
            for ptype in ("flatrate", "ads", "buy", "rent"):
                for prov in blob.get(ptype, []):
                    prov_name_set.add(prov["provider_name"])
                    records_prov.append({
                        "movie_id":     mid,
                        "country":      country,
                        "access":       ptype,
                        "provider_id":  prov["provider_id"],
                        "provider_name":prov["provider_name"]
                    })
        
        # --- basic ---
        records_basic.append({
            "movie_id": mid,
            "title": doc.get("title"),
            "tagline": doc.get("tagline"),
            "overview": doc.get("overview"),
            "vote_average": doc.get("vote_average"),
            "runtime": doc.get("runtime"),
            "release_date": doc.get("release_date"),
            "genres": [g["name"] for g in doc.get("genres", [])],
            "keywords_list":   kw_names,
            "providers_list":  sorted(prov_name_set),
        })
    return (
        pd.DataFrame(records_basic),
        pd.DataFrame(records_kw),
        pd.DataFrame(records_prov)
    )

df_movies, df_kw, df_prov = explode_movies(raw_movies)
df_movies.to_parquet(PROC_DIR / "movies.parquet")
df_kw.to_parquet(PROC_DIR / "keywords.parquet")
df_prov.to_parquet(PROC_DIR / "providers.parquet")


In [10]:
# Sanity checks
for name, df in [
    ("movies", df_movies),
    ("keywords", df_kw),
    ("providers", df_prov),
]:
    print(f"{name:10s}", len(df))
df_movies.head()


movies     19986
keywords   10329
providers  4274


Unnamed: 0,movie_id,title,tagline,overview,vote_average,runtime,release_date,genres,keywords_list,providers_list
0,1363475,Into the Reds Tale,,An abomination mimic creature enters the story...,0.0,4,2025-06-27,"[Animation, Horror, Fantasy]",[],[]
1,1355895,Pader,,"A young boy, clad in a dunce cap and lion's pa...",0.0,8,2024-09-01,[],[],[]
2,1285711,Dadfest,,The long-simmering tension between two sisters...,0.0,0,2024-02-21,[],[],[]
3,1495436,The Deadline,You can't escape it!,A writer struggles to turn in his screenplay b...,0.0,11,2025-06-02,[Comedy],[screenwriting],[]
4,1481039,Pollastra All Along,,,10.0,0,2025-05-11,"[Comedy, Adventure, Music, Documentary, Mystery]","[psicologia, hilarious, viaggio]",[]


In [11]:
# Test scripts
movies = [1,2,3]

raw_movies = asyncio.run(fetch_many(movies))
# Persist raw JSONL
with open(RAW_DIR / "movies_sample.jsonl", "w") as fp:
    for item in raw_movies:
        fp.write(json.dumps(item, ensure_ascii=False) + "\n")

movies: 100%|█████████████████████████████████████| 3/3 [00:00<00:00, 13.78it/s]
