# How to Run

## Prerequisites
1. Python >= 3.9
2. TMDB v3 API key
3. Optional: `requests` library (will fallback to urllib if not available)

## Setup Steps

### 1. Get TMDB API Key
- Register at https://www.themoviedb.org/
- Go to Settings > API > Request API Key
- Copy your API key (v3 auth)

### 2. Set Environment Variable
**Windows (PowerShell):**
```powershell
$env:TMDB_API_KEY = "your_api_key_here"
```

**Windows (CMD):**
```cmd
set TMDB_API_KEY=your_api_key_here
```

**Linux/Mac:**
```bash
export TMDB_API_KEY=your_api_key_here
```

### 3. Install Dependencies (Optional)
```bash
pip install requests
```

### 4. Run the Notebook
1. Open this notebook in Jupyter or VS Code
2. **Important**: Update the `TMDB_API_KEY` variable in Cell 2 with your actual API key
3. Restart the kernel after setting the API key
4. Run cells sequentially from top to bottom or run all
5. For testing: Run Cell 6 (sample run with 150 movies)
6. For full harvest: Uncomment and run Cell 7 (20,000+ movies)

## Output Files
- `movies_full.jsonl`: JSONL format checkpoint file
- `movies_full.csv`: CSV format with all enriched movie data

## Notes
- Adjust `SUMMARY_TARGET`, `SUMMARY_START_YEAR` in Cell 2 to customize data collection
- Increase delay parameters if you encounter rate limiting (HTTP 429)
- The notebook supports resume from checkpoint - rerun to continue from where it stopped

# TMDB Movie Dataset Harvest Notebook

This notebook helps you build a local movie dataset (summary + enriched metadata) from TMDB API in a maintainable, restartable way.

Contents:
1. Configuration & API Key
2. Core HTTP utilities (retry, backoff)
3. Summary collection via /discover (segmented by year)
4. Detail enrichment (append_to_response blocks)
5. Checkpointed harvesting (JSONL + CSV)
6. Usage tips & performance notes

Prerequisites:
- TMDB v3 API key set as environment variable `TMDB_API_KEY`.
- Python >= 3.9. Optional: `requests` library (will fallback to urllib).

Set API key (PowerShell / Windows):
```powershell
$env:TMDB_API_KEY = "YOUR_TMDB_API_KEY"
```
Restart kernel after setting if it wasn't previously defined.


In [None]:
# 1. Imports & Configuration
import os, json, time, csv, math, pathlib
from typing import Dict, Any, List, Optional, Iterable, Sequence
from datetime import datetime

try:
    import requests  # type: ignore
    HAVE_REQUESTS = True
except Exception:
    import urllib.request, urllib.parse
    HAVE_REQUESTS = False
TMDB_API_KEY = ""
# TMDB_API_KEY = os.getenv("TMDB_API_KEY")
if not TMDB_API_KEY:
    raise RuntimeError("Missing TMDB_API_KEY. Set environment variable before proceeding.")

BASE_URL = "https://api.themoviedb.org/3"
LANGUAGE = "en-US"

# Tuning parameters (adjust as needed)
SUMMARY_TARGET = 20000         # number of summary rows desired
SUMMARY_START_YEAR = 2000      # earliest release year
SUMMARY_DELAY = 0.12           # delay between discover page requests
DETAIL_APPEND_BLOCKS = [
    "credits","keywords","release_dates","videos","external_ids"
]
DETAIL_RATE_DELAY = 0.12       # delay between detail requests
DETAIL_BATCH_SIZE = 500        # flush interval for checkpoint files
CHECKPOINT_PREFIX = "movies_full"  # base name for output files


In [None]:
# 2. Core HTTP utilities (retry, backoff)

def http_get(path: str, params: Optional[Dict[str, Any]] = None,
             retries: int = 3, backoff: float = 1.5, timeout: int = 30) -> Dict[str, Any]:
    """
    Thực hiện HTTP GET request đến TMDB API với retry và backoff logic.
    
    Input:
        - path (str): Đường dẫn API endpoint (ví dụ: "/movie/123")
        - params (Optional[Dict[str, Any]]): Dictionary các tham số query string
        - retries (int): Số lần retry khi gặp lỗi (mặc định: 3)
        - backoff (float): Hệ số backoff cho mỗi lần retry (mặc định: 1.5)
        - timeout (int): Timeout cho request tính bằng giây (mặc định: 30)
    
    Output:
        - Dict[str, Any]: JSON response từ API dưới dạng dictionary
    
    Mô tả:
        Hàm gọi API TMDB với cơ chế retry tự động khi gặp lỗi. Tự động thêm api_key
        vào params. Xử lý rate limiting (HTTP 429) bằng cách đợi theo Retry-After header.
        Hỗ trợ cả thư viện requests và urllib (fallback).
    """
    params = dict(params or {})
    params["api_key"] = TMDB_API_KEY

    if HAVE_REQUESTS:
        url = f"{BASE_URL}{path}"
        last_exc: Optional[Exception] = None
        for attempt in range(retries):
            try:
                resp = requests.get(url, params=params, timeout=timeout)
                if resp.status_code == 429:
                    wait = int(resp.headers.get("Retry-After", 2))
                    time.sleep(wait)
                    continue
                resp.raise_for_status()
                return resp.json()
            except Exception as e:
                last_exc = e
                if attempt == retries - 1:
                    raise
                time.sleep(backoff ** attempt)
        assert False, f"Unreachable: {last_exc}"
    else:
        qs = urllib.parse.urlencode(params)
        url = f"{BASE_URL}{path}?{qs}"
        last_exc: Optional[Exception] = None
        for attempt in range(retries):
            try:
                with urllib.request.urlopen(url, timeout=timeout) as resp:
                    status = resp.getcode()
                    if status == 429:
                        wait = int(resp.headers.get("Retry-After", "2"))
                        time.sleep(wait)
                        continue
                    if status >= 400:
                        raise RuntimeError(f"HTTP {status}: {url}")
                    data = resp.read()
                    return json.loads(data.decode("utf-8"))
            except Exception as e:
                last_exc = e
                if attempt == retries - 1:
                    raise
                time.sleep(backoff ** attempt)
        assert False, f"Unreachable: {last_exc}"


In [None]:
# 3. Discover summary collection (segmented by year)

def discover_movies(target_count: int,
                    start_year: int,
                    end_year: int,
                    per_page_delay: float = 0.1,
                    sort_by: str = "popularity.desc") -> List[Dict[str, Any]]:
    """
    Thu thập danh sách tóm tắt phim từ TMDB discover API, phân đoạn theo năm.
    
    Input:
        - target_count (int): Số lượng phim mục tiêu cần thu thập
        - start_year (int): Năm bắt đầu (ví dụ: 2000)
        - end_year (int): Năm kết thúc (ví dụ: 2025)
        - per_page_delay (float): Thời gian delay giữa các request (giây), mặc định: 0.1
        - sort_by (str): Tiêu chí sắp xếp, mặc định: "popularity.desc"
    
    Output:
        - List[Dict[str, Any]]: Danh sách các dictionary chứa thông tin tóm tắt phim
    
    Mô tả:
        Hàm duyệt qua các năm từ start_year đến end_year, với mỗi năm gọi discover API
        để lấy danh sách phim. Tự động phân trang và loại bỏ các phim trùng lặp.
        Dừng khi đạt đủ target_count hoặc hết dữ liệu.
    """
    collected: List[Dict[str, Any]] = []
    seen_ids = set()
    for year in range(start_year, end_year + 1):
        if len(collected) >= target_count:
            break
        page = 1
        while page <= 500 and len(collected) < target_count:
            payload = http_get(
                "/discover/movie",
                {
                    "language": LANGUAGE,
                    "sort_by": sort_by,
                    "include_adult": False,
                    "include_video": False,
                    "primary_release_year": year,
                    "page": page,
                },
            )
            results = payload.get("results", [])
            if not results:
                break
            for mv in results:
                mid = mv.get("id")
                if mid is None or mid in seen_ids:
                    continue
                seen_ids.add(mid)
                collected.append(mv)
                if len(collected) >= target_count:
                    break
            page += 1
            if per_page_delay:
                time.sleep(per_page_delay)
    return collected


In [None]:
# 4. Detail fetch + flatten

def fetch_movie_detail(movie_id: int,
                       append_blocks: Optional[Sequence[str]] = None) -> Dict[str, Any]:
    """
    Lấy thông tin chi tiết của một phim từ TMDB API.
    
    Input:
        - movie_id (int): ID của phim trên TMDB
        - append_blocks (Optional[Sequence[str]]): Danh sách các block dữ liệu bổ sung
          (ví dụ: ["credits", "keywords", "videos"])
    
    Output:
        - Dict[str, Any]: Dictionary chứa toàn bộ thông tin chi tiết của phim
    
    Mô tả:
        Hàm gọi API endpoint /movie/{movie_id} để lấy thông tin đầy đủ của phim.
        Sử dụng tham số append_to_response để lấy thêm các block dữ liệu như
        credits, keywords, release_dates trong một request duy nhất.
    """
    append = ",".join(append_blocks or [])
    params = {"language": LANGUAGE}
    if append:
        params["append_to_response"] = append
    return http_get(f"/movie/{movie_id}", params)


def flatten_detail(d: Dict[str, Any]) -> Dict[str, Any]:
    """
    Chuyển đổi JSON chi tiết phim phức tạp thành dictionary phẳng cho CSV export.
    
    Input:
        - d (Dict[str, Any]): Dictionary chứa dữ liệu chi tiết phim từ API
    
    Output:
        - Dict[str, Any]: Dictionary đã được làm phẳng với các field đơn giản
          và các list được join thành string
    
    Mô tả:
        Hàm trích xuất và làm phẳng các trường quan trọng từ JSON response phức tạp.
        Xử lý nested objects (collection, genres, companies, countries, credits, etc.).
        Chuyển đổi các list thành chuỗi phân tách bằng dấu phẩy để dễ lưu vào CSV.
        Trích xuất certification (US), trailer key, external IDs, top 5 cast và directors.
    """
    out: Dict[str, Any] = {}
    basic = ["id","title","original_title","overview","release_date","original_language","imdb_id","budget","revenue","runtime","status","tagline","popularity","vote_count","vote_average"]
    for k in basic:
        out[k] = d.get(k)
    coll = d.get("belongs_to_collection") or {}
    out["collection_id"] = coll.get("id")
    out["collection_name"] = coll.get("name")
    out["genres"] = ",".join(g.get("name") for g in d.get("genres", []) if g.get("name"))
    out["production_companies"] = ",".join(pc.get("name") for pc in d.get("production_companies", []) if pc.get("name"))
    out["production_countries"] = ",".join(pc.get("iso_3166_1") for pc in d.get("production_countries", []) if pc.get("iso_3166_1"))
    out["spoken_languages"] = ",".join(sl.get("iso_639_1") for sl in d.get("spoken_languages", []) if sl.get("iso_639_1"))
    kws = d.get("keywords", {}).get("keywords", [])
    out["keywords"] = ",".join(k.get("name") for k in kws if k.get("name"))
    # Release certification (US)
    cert = None
    for country in d.get("release_dates", {}).get("results", []):
        if country.get("iso_3166_1") == "US":
            rels = country.get("release_dates", [])
            if rels:
                cert = rels[0].get("certification")
            break
    out["certification_US"] = cert
    # Trailer key
    vids = d.get("videos", {}).get("results", [])
    trailer = next((v for v in vids if v.get("type") == "Trailer"), None)
    out["trailer_key"] = trailer.get("key") if trailer else None
    # External IDs
    ext = d.get("external_ids", {})
    for k in ["facebook_id","instagram_id","twitter_id"]:
        out[k] = ext.get(k)
    # Credits (top cast + directors)
    cast = d.get("credits", {}).get("cast", [])
    crew = d.get("credits", {}).get("crew", [])
    out["cast_top5"] = ",".join(c.get("name") for c in cast[:5] if c.get("name"))
    directors = [c.get("name") for c in crew if c.get("job") == "Director" and c.get("name")]
    out["directors"] = ",".join(directors)
    return out


In [None]:
# 5. Checkpointed enrichment pipeline
CHECKPOINT_JSONL = pathlib.Path(f"{CHECKPOINT_PREFIX}.jsonl")
CHECKPOINT_CSV = pathlib.Path(f"{CHECKPOINT_PREFIX}.csv")


def load_processed_ids(jsonl_path: pathlib.Path) -> set:
    """
    Đọc danh sách ID phim đã được xử lý từ file checkpoint JSONL.
    
    Input:
        - jsonl_path (pathlib.Path): Đường dẫn đến file JSONL checkpoint
    
    Output:
        - set: Set chứa các movie ID đã được xử lý
    
    Mô tả:
        Hàm đọc file JSONL checkpoint và trích xuất các movie ID đã được xử lý.
        Dùng để tránh xử lý trùng lặp khi resume từ checkpoint. Nếu file không
        tồn tại, trả về set rỗng.
    """
    ids = set()
    if jsonl_path.exists():
        with open(jsonl_path, "r", encoding="utf-8") as f:
            for line in f:
                try:
                    obj = json.loads(line)
                    mid = obj.get("id")
                    if mid is not None:
                        ids.add(mid)
                except Exception:
                    pass
    return ids


def flush_rows(rows: List[Dict[str, Any]], csv_path: pathlib.Path, jsonl_path: pathlib.Path, field_order: List[str]) -> List[str]:
    """
    Ghi buffer rows vào file checkpoint (JSONL và CSV).
    
    Input:
        - rows (List[Dict[str, Any]]): List các dictionary chứa dữ liệu phim cần ghi
        - csv_path (pathlib.Path): Đường dẫn file CSV output
        - jsonl_path (pathlib.Path): Đường dẫn file JSONL output
        - field_order (List[str]): Thứ tự các field cho CSV header
    
    Output:
        - List[str]: Thứ tự field đã được cập nhật (hoặc giữ nguyên)
    
    Mô tả:
        Hàm ghi batch data vào cả file JSONL (append) và CSV (append).
        Tự động xác định field order ở lần ghi đầu tiên. Ghi CSV header
        nếu file chưa tồn tại. Sau khi ghi xong, xóa buffer rows.
    """
    if not rows:
        return field_order
    # JSONL append
    with open(jsonl_path, "a", encoding="utf-8") as jf:
        for r in rows:
            jf.write(json.dumps(r, ensure_ascii=False))
            jf.write("\n")
    # Determine field order first time
    if not field_order:
        keys = set()
        for r in rows:
            keys.update(r.keys())
        field_order = sorted(keys)
    write_header = not csv_path.exists()
    with open(csv_path, "a", encoding="utf-8", newline="") as cf:
        writer = csv.DictWriter(cf, fieldnames=field_order)
        if write_header:
            writer.writeheader()
        for r in rows:
            writer.writerow({k: r.get(k) for k in field_order})
    rows.clear()
    return field_order


def enrich_movies(summary: List[Dict[str, Any]],
                  append_blocks: Sequence[str],
                  batch_size: int = 500,
                  rate_delay: float = 0.12) -> None:
    """
    Làm giàu dữ liệu phim từ danh sách summary bằng cách fetch detail và ghi checkpoint.
    
    Input:
        - summary (List[Dict[str, Any]]): Danh sách các dictionary chứa summary phim
        - append_blocks (Sequence[str]): Các block dữ liệu bổ sung cần fetch
          (ví dụ: ["credits", "keywords", "release_dates", "videos"])
        - batch_size (int): Số lượng rows để flush checkpoint (mặc định: 500)
        - rate_delay (float): Delay giữa các request detail (giây), mặc định: 0.12
    
    Output:
        - None (ghi trực tiếp vào file checkpoint)
    
    Mô tả:
        Hàm chính để làm giàu dữ liệu phim. Với mỗi phim trong summary:
        1. Check nếu đã xử lý (từ checkpoint) thì bỏ qua
        2. Fetch detail từ API
        3. Flatten detail thành format phẳng
        4. Thêm vào buffer và flush theo batch_size
        5. Ghi checkpoint định kỳ để hỗ trợ resume
        Hiển thị progress và thời gian xử lý khi hoàn thành.
    """
    processed_ids = load_processed_ids(CHECKPOINT_JSONL)
    buffer: List[Dict[str, Any]] = []
    field_order: List[str] = []
    start = time.time()
    for idx, mv in enumerate(summary, 1):
        mid = mv.get("id")
        if mid is None or mid in processed_ids:
            continue
        try:
            detail = fetch_movie_detail(int(mid), append_blocks)
            flat = flatten_detail(detail)
            buffer.append(flat)
            processed_ids.add(mid)
        except Exception as e:
            print(f"Error {mid}: {e}")
        if len(buffer) >= batch_size:
            field_order = flush_rows(buffer, CHECKPOINT_CSV, CHECKPOINT_JSONL, field_order)
            print(f"Checkpoint: {len(processed_ids)} enriched")
        if rate_delay:
            time.sleep(rate_delay)
    # final flush
    if buffer:
        field_order = flush_rows(buffer, CHECKPOINT_CSV, CHECKPOINT_JSONL, field_order)
    duration = time.time() - start
    print(f"Done. Total enriched: {len(processed_ids)} | Time: {duration/60:.2f} min")
    print(f"Files: {CHECKPOINT_JSONL}, {CHECKPOINT_CSV}")


In [26]:
# 6. Example: small sample run (safe quick test)
# Uncomment to test with a small number before full harvest.
SAMPLE_COUNT = 150
sample_summary = discover_movies(
    target_count=SAMPLE_COUNT,
    start_year=2022,
    end_year=datetime.now().year,
    per_page_delay=0.05,
)
print(f"Sample summary rows: {len(sample_summary)}")
enrich_movies(sample_summary, DETAIL_APPEND_BLOCKS, batch_size=50, rate_delay=0.10)


Sample summary rows: 150
Checkpoint: 200 enriched
Checkpoint: 200 enriched
Checkpoint: 250 enriched
Checkpoint: 250 enriched
Checkpoint: 300 enriched
Done. Total enriched: 300 | Time: 2.04 min
Files: movies_full.jsonl, movies_full.csv
Checkpoint: 300 enriched
Done. Total enriched: 300 | Time: 2.04 min
Files: movies_full.jsonl, movies_full.csv


In [25]:
# 7. Full harvest (summary + enrichment)
# Run when ready. Estimated time varies with network & rate limits.
# WARNING: This may take several minutes for 20k + detail enrichment.
# Uncomment the lines below to execute.

# summary_movies = discover_movies(
#     target_count=SUMMARY_TARGET,
#     start_year=SUMMARY_START_YEAR,
#     end_year=datetime.now().year,
#     per_page_delay=SUMMARY_DELAY,
# )
# print(f"Summary collected: {len(summary_movies)}")
# save_jsonl = lambda items, path: open(path, 'w', encoding='utf-8').write('\n'.join(json.dumps(it, ensure_ascii=False) for it in items))
# save_jsonl(summary_movies, 'movies_summary.jsonl')
# print('Saved movies_summary.jsonl')
# enrich_movies(summary_movies, DETAIL_APPEND_BLOCKS, batch_size=DETAIL_BATCH_SIZE, rate_delay=DETAIL_RATE_DELAY)


## 8. Usage & Performance Notes
- Summary endpoints return limited fields; enrichment adds budget, revenue, runtime, companies, countries, languages, collection, external IDs, credits.
- Adjust delays if you face HTTP 429 (rate limiting). Increase `SUMMARY_DELAY` / `DETAIL_RATE_DELAY`.
- Add blocks: include `images,recommendations,similar` in `DETAIL_APPEND_BLOCKS` for more breadth (heavier & slower).
- Resume support: rerunning enrichment will continue from existing JSONL/CSV checkpoint.
- For experimentation, start with small sample (Cell 6) before full run (Cell 7).
- Consider official TMDB bulk dataset (e.g. Kaggle mirrors) if you need more than 20k + faster turnaround.
- Remove any API key from code cells before sharing or committing.
