### **데이터 수집**
- 한국에서 제작 + 한국 극장 개봉 영화만 수집 -> tmdb_kr_theatrical_2005_2025.csv 저장

In [None]:
# -*- coding: utf-8 -*-
"""
한국에서 제작 + 한국 극장 개봉(2|3) 영화만 수집 → CSV 저장(진행률 표시)
기간: 2005-01-01 ~ 2025-06-30
"""

import os, time, datetime as dt
import requests
import pandas as pd
from tqdm import tqdm
from dotenv import load_dotenv

# ===================== 설정 =====================
START_DATE = "2005-01-01"
END_DATE   = "2025-06-30"

LANG = "ko-KR"
BASE = "https://api.themoviedb.org/3"
SLEEP = 0.05                       # 요청 간 딜레이
TIMEOUT = 30

INCLUDE_ADULT = False              # 성인물 포함하려면 True
INCLUDE_VIDEO = False              # 보통 False 권장
VOTE_COUNT_MIN = 10                # 너무 마이너 작품 제거(원하면 20/50 등으로 조정)

OUT_CSV = "data/raw/tmdb_kr_theatrical_2005_2025.csv"

# ===================== 준비 =====================
load_dotenv()
API_KEY = os.getenv("TMDB_API_KEY")
if not API_KEY:
    raise SystemExit("❌ TMDB_API_KEY가 환경변수(.env)에 없습니다.")

os.makedirs(os.path.dirname(OUT_CSV), exist_ok=True)

# ===================== 공통 유틸 =====================
def get_json(url: str, params: dict, tries: int = 3):
    for i in range(tries):
        try:
            r = requests.get(url, params=params, timeout=TIMEOUT)
            if r.status_code in (429, 500, 502, 503, 504):
                time.sleep(min(2**i, 8)); continue
            r.raise_for_status()
            return r.json()
        except requests.RequestException:
            if i == tries - 1:
                raise
            time.sleep(min(2**i, 8))
    raise RuntimeError("Unreachable")

def common_params():
    return {
        "api_key": API_KEY,
        "language": LANG,
        "include_adult": str(INCLUDE_ADULT).lower(),
        "include_video": str(INCLUDE_VIDEO).lower(),
        "with_vote_count.gte": VOTE_COUNT_MIN,
        "with_origin_country": "KR",     # 🇰🇷 제작국 한국
        "region": "KR",                  # 🇰🇷 개봉 지역 한국
        "with_release_type": "2|3",      # 극장 개봉(제한/정식)
        "sort_by": "release_date.asc",   # 한국 개봉일 기준 정렬
        # release_date.gte/lte는 호출 시 주입
    }

# ===================== 1) ID 수집 =====================
def discover_total_pages(date_gte: str, date_lte: str) -> int:
    p = common_params()
    p.update({"release_date.gte": date_gte, "release_date.lte": date_lte, "page": 1})
    js = get_json(f"{BASE}/discover/movie", p)
    return int(js.get("total_pages", 1)), int(js.get("total_results", 0)), js.get("results", [])

def discover_ids_for_range(date_gte: str, date_lte: str, pbar=None):
    """범위를 /discover로 수집. total_pages<=500이면 한 번에, 아니면 호출자가 슬라이스."""
    p = common_params()
    p.update({"release_date.gte": date_gte, "release_date.lte": date_lte, "page": 1})
    first = get_json(f"{BASE}/discover/movie", p)
    total_pages = int(first.get("total_pages", 1))
    ids = [m["id"] for m in first.get("results", [])]
    if pbar: pbar.update(1)

    for page in range(2, min(total_pages, 500) + 1):
        p["page"] = page
        js = get_json(f"{BASE}/discover/movie", p)
        ids.extend([m["id"] for m in js.get("results", [])])
        if pbar: pbar.update(1)
        time.sleep(SLEEP)
    capped = total_pages > 500
    return ids, capped, total_pages

def year_slices(start: str, end: str):
    s = dt.date.fromisoformat(start); e = dt.date.fromisoformat(end)
    for y in range(s.year, e.year + 1):
        y0 = dt.date(y, 1, 1); y1 = dt.date(y, 12, 31)
        if y0 < s: y0 = s
        if y1 > e: y1 = e
        yield y0.isoformat(), y1.isoformat()

def collect_all_ids():
    print("→ ID 수집 범위:", START_DATE, "~", END_DATE)
    tp, tr, first_results = discover_total_pages(START_DATE, END_DATE)
    print(f"   예상 총 페이지: {tp} (총 편수 예상: {tr})")

    if tp <= 500:
        # 전 구간을 한 번에 수집 (페이지 기반 진행바)
        ids = [m["id"] for m in first_results]
        with tqdm(total=tp, desc="Discover pages (all range)", leave=False) as bar:
            bar.update(1)  # page=1 처리 반영
            more_ids, _, _ = discover_ids_for_range(START_DATE, END_DATE, pbar=bar)
            ids = more_ids  # discover_ids_for_range 안에서 page1부터 다시 처리했으므로 그걸 사용
        return sorted(set(ids))

    # 500 초과 → 연 단위 슬라이스 수집 (연 전체 페이지 합산해서 진행바 표시)
    # 먼저 각 연도의 페이지 수를 파악해 합산
    total_pages_sum = 0
    year_meta = []
    for yg, yl in year_slices(START_DATE, END_DATE):
        pages, _, _ = discover_total_pages(yg, yl)
        year_meta.append((yg, yl, pages))
        total_pages_sum += min(pages, 500)
    print(f"   연 단위 슬라이스 진행 (총 페이지 합산: {total_pages_sum})")

    all_ids = set()
    with tqdm(total=total_pages_sum, desc="Discover pages (by year)", leave=False) as bar:
        for yg, yl, pages in year_meta:
            ids_y, _, _ = discover_ids_for_range(yg, yl, pbar=bar)
            all_ids.update(ids_y)
            time.sleep(SLEEP)
    return sorted(all_ids)

# ===================== 2) 상세 수집 =====================
def fetch_detail(mid: int):
    params = {"api_key": API_KEY, "language": LANG}
    try:
        js = get_json(f"{BASE}/movie/{mid}", params)
        time.sleep(SLEEP)
        return js
    except requests.RequestException:
        return None

# ===================== 3) 정규화 → CSV =====================
def parse_genres(val):
    return [x.get("name") for x in val] if isinstance(val, list) else []

def normalize_rows(rows):
    df = pd.json_normalize(rows)

    out = pd.DataFrame({
        "movie_id": df.get("id"),
        "title": df.get("title"),
        "original_title": df.get("original_title"),
        "original_language": df.get("original_language"),
        "release_date": df.get("release_date"),
        "runtime": pd.to_numeric(df.get("runtime"), errors="coerce"),
        "budget": pd.to_numeric(df.get("budget"), errors="coerce"),
        "revenue": pd.to_numeric(df.get("revenue"), errors="coerce"),
        "vote_average": pd.to_numeric(df.get("vote_average"), errors="coerce"),
        "vote_count": pd.to_numeric(df.get("vote_count"), errors="coerce"),
        "popularity": pd.to_numeric(df.get("popularity"), errors="coerce"),
        "genres": df.get("genres").apply(parse_genres) if "genres" in df else [],
        "production_companies": df.get("production_companies").apply(
            lambda xs: [x.get("name") for x in xs] if isinstance(xs, list) else []
        ) if "production_companies" in df else [],
        "production_countries": df.get("production_countries").apply(
            lambda xs: [x.get("iso_3166_1") for x in xs] if isinstance(xs, list) else []
        ) if "production_countries" in df else [],
    })
    dt_series = pd.to_datetime(out["release_date"], errors="coerce")
    out["release_year"]  = dt_series.dt.year
    out["release_month"] = dt_series.dt.month
    return out

# ===================== 메인 =====================
def main():
    print("1) ID 수집 (🇰🇷 제작 ∩ 🇰🇷 극장 2|3)…")
    ids = collect_all_ids()
    print(f"   → 고유 ID 수: {len(ids):,}")

    print("2) 상세 수집 중…")
    rows = []
    with tqdm(total=len(ids), desc="Fetch details", leave=False) as bar:
        for mid in ids:
            d = fetch_detail(mid)
            if d: rows.append(d)
            bar.update(1)

    print("3) 정규화 및 CSV 저장…")
    df = normalize_rows(rows)
    df.to_csv(OUT_CSV, index=False, encoding="utf-8-sig")
    print(f"✅ 완료: {OUT_CSV} (rows={len(df):,})")

if __name__ == "__main__":
    main()


### **데이터 정제**
- 형변환, buget / revenue 0 인 행들 삭제. genre explode -> tmdb_kr_theatrical_clean_exploded.csv 저장

In [None]:
import pandas as pd
import ast

# === 1) CSV 로드 ===
df = pd.read_csv("./raw/tmdb_kr_theatrical_2005_2025.csv")

print("원본 shape:", df.shape)
display(df.head())
df.info()

# === 2) budget/revenue == 0 → 행 제거 ===
df = df[(df["budget"] != 0) & (df["revenue"] != 0)]

# === 3) ROI 계산 ===
df["roi"] = (df["revenue"] - df["budget"]) / df["budget"]

# === 4) release_date 날짜형 변환 ===
df["release_date"] = pd.to_datetime(df["release_date"], errors="coerce")

# === 5) release_year / release_month → int (nullable) 변환 ===
df["release_year"]  = df["release_year"].astype("Int64")
df["release_month"] = df["release_month"].astype("Int64")

# === 6) 필수 컬럼 결측 제거 ===
df = df.dropna(subset=["budget", "revenue", "release_date", "genres"])

# === 7) genres 문자열 → 리스트 파싱 ===
def parse_genres(x):
    if isinstance(x, list):
        return x
    if pd.isna(x):
        return []
    s = str(x).strip()
    # JSON/리스트 문자열이면 literal_eval
    if s.startswith("[") and s.endswith("]"):
        try:
            val = ast.literal_eval(s)
            # 리스트 안에 dict가 있을 수도 있음: {"name": "..."}
            out = []
            for item in val:
                if isinstance(item, dict):
                    out.append(item.get("name"))
                else:
                    out.append(str(item).strip())
            return [g for g in out if g]
        except Exception:
            pass
    # 혹시 "드라마, 로맨스"처럼 쉼표 구분일 수도 있음
    if "," in s:
        return [t.strip() for t in s.split(",") if t.strip()]
    # 단일 장르 문자열인 경우
    return [s] if s else []

df["genres"] = df["genres"].apply(parse_genres)

# === 8) 장르가 비어있는 행 제거 ===
df = df[df["genres"].apply(lambda lst: isinstance(lst, list) and len(lst) > 0)]

# === 9) 장르 explode ===
df_exploded = df.explode("genres", ignore_index=True)

# 빈 문자열/결측 방지
df_exploded = df_exploded[df_exploded["genres"].notna() & (df_exploded["genres"].str.strip() != "")]

# === 10) 저장 ===
df.to_csv("./data_processed/tmdb_kr_theatrical_clean.csv", index=False, encoding="utf-8-sig")
df_exploded.to_csv("./data_processed/tmdb_kr_theatrical_clean_exploded.csv", index=False, encoding="utf-8-sig")

# === 11) 확인 ===
print("explode 후 shape:", df_exploded.shape)
display(df_exploded[["title","genres","release_year","revenue","roi"]].head(10))
df_exploded.info()