# IMDb 01 Prep (Movie-Only Catalog)

Objective:
- Build a movie-only IMDb catalog with TMDB plot enrichment.
- Aggregate top-3 AKA titles and infer multilingual language buckets.
- Produce `imdb_movies_catalog.csv`, `imdb_movies_meta.csv`, and prep stats in manifest.


In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
import zipfile
import os

zip_file_path = '/content/drive/MyDrive/cinematch/Data/IMDB0226.zip'
extraction_path = '/content/drive/MyDrive/cinematch/Data/IMDB'

# Create the extraction directory if it doesn't exist
os.makedirs(extraction_path, exist_ok=True)

# Open the zip file in read mode
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    # Extract all the contents to the specified directory
    zip_ref.extractall(extraction_path)

print(f"'{zip_file_path}' extracted to '{extraction_path}' successfully.")


'/content/drive/MyDrive/cinematch/Data/IMDB0226.zip' extracted to '/content/drive/MyDrive/cinematch/Data/IMDB' successfully.


In [None]:
import gzip
import shutil
import os
from pathlib import Path

imdb_data_dir = Path('/content/drive/MyDrive/cinematch/Data/IMDB')

if not imdb_data_dir.is_dir():
    print(f"Error: Directory not found at {imdb_data_dir}")
else:
    tsv_gz_files = list(imdb_data_dir.glob('*.tsv.gz'))

    if not tsv_gz_files:
        print(f"No .tsv.gz files found in {imdb_data_dir}")
    else:
        print(f"Found {len(tsv_gz_files)} .tsv.gz files. Extracting...")
        for gz_file_path in tsv_gz_files:
            tsv_file_path = gz_file_path.with_suffix('') # Remove .gz suffix
            print(f"Extracting {gz_file_path.name} to {tsv_file_path.name}...")
            try:
                with gzip.open(gz_file_path, 'rb') as f_in:
                    with open(tsv_file_path, 'wb') as f_out:
                        shutil.copyfileobj(f_in, f_out)
                print(f"Successfully extracted {gz_file_path.name}")
            except Exception as e:
                print(f"Error extracting {gz_file_path.name}: {e}")
        print("All .tsv.gz file extraction process completed.")


Found 7 .tsv.gz files. Extracting...
Extracting name.basics.tsv.gz to name.basics.tsv...
Successfully extracted name.basics.tsv.gz
Extracting title.akas.tsv.gz to title.akas.tsv...
Successfully extracted title.akas.tsv.gz
Extracting title.basics.tsv.gz to title.basics.tsv...
Successfully extracted title.basics.tsv.gz
Extracting title.crew.tsv.gz to title.crew.tsv...
Successfully extracted title.crew.tsv.gz
Extracting title.episode.tsv.gz to title.episode.tsv...
Successfully extracted title.episode.tsv.gz
Extracting title.principals.tsv.gz to title.principals.tsv...
Successfully extracted title.principals.tsv.gz
Extracting title.ratings.tsv.gz to title.ratings.tsv...
Successfully extracted title.ratings.tsv.gz
All .tsv.gz file extraction process completed.


In [2]:
from __future__ import annotations

import csv
import json
import re
import sys
import time
from collections import Counter, defaultdict
from pathlib import Path
from typing import Dict, List, Set

import numpy as np
import pandas as pd

csv.field_size_limit(sys.maxsize)

IMDB_DIR = Path('/content/drive/MyDrive/cinematch/Data/IMDB')
TMDB_PATH = Path('/content/drive/MyDrive/cinematch/Data/TMDB_movie_dataset_v11.csv')
OUT_DIR = Path('/content/drive/MyDrive/cinematch/outputs/imdb')
OUT_DIR.mkdir(parents=True, exist_ok=True)

BASICS_PATH = IMDB_DIR / 'title.basics.tsv'
RATINGS_PATH = IMDB_DIR / 'title.ratings.tsv'
AKAS_PATH = IMDB_DIR / 'title.akas.tsv'

CATALOG_PATH = OUT_DIR / 'imdb_movies_catalog.csv'
META_PATH = OUT_DIR / 'imdb_movies_meta.csv'
MANIFEST_PATH = OUT_DIR / 'imdb_movies_build_manifest.json'

CHUNKSIZE_BASICS = 1_000_000
CHUNKSIZE_AKAS = 1_000_000
TOP_AKAS = 3

TARGET_LANGS = ['te', 'hi', 'ta', 'ja', 'ko']
TARGET_LANG_SET = set(TARGET_LANGS)

print('IMDb directory:', IMDB_DIR)
print('TMDB path:', TMDB_PATH)
print('Output directory:', OUT_DIR)


IMDb directory: /content/drive/MyDrive/cinematch/Data/IMDB
TMDB path: /content/drive/MyDrive/cinematch/Data/TMDB_movie_dataset_v11.csv
Output directory: /content/drive/MyDrive/cinematch/outputs/imdb


## Utility Functions

These helpers implement ranking for AKA titles, script detection fallback, language bucket inference, and `movieDoc` creation.


In [None]:
def clean_value(v: str) -> str:
    if v is None:
        return ''
    s = str(v).strip()
    return '' if s in {'', '\\N', 'nan', 'NaN', '<NA>'} else s


def clean_int(v: str):
    s = clean_value(v)
    if not s:
        return pd.NA
    try:
        return int(float(s))
    except Exception:
        return pd.NA


def clean_float(v: str):
    s = clean_value(v)
    if not s:
        return np.nan
    try:
        return float(s)
    except Exception:
        return np.nan


def contains_imdb_display(types_value: str) -> int:
    parts = [p.strip() for p in clean_value(types_value).split(',') if p.strip()]
    return 1 if 'imdbDisplay' in parts else 0


def rank_tuple(is_original: str, types_value: str, ordering: str, title: str):
    is_orig = 1 if clean_value(is_original) == '1' else 0
    has_imdb_display = contains_imdb_display(types_value)
    ord_val = clean_int(ordering)
    ord_num = int(ord_val) if ord_val is not pd.NA else 10**9
    return (-is_orig, -has_imdb_display, ord_num, title.casefold())


def update_top_k_candidates(candidates: List[dict], new_candidate: dict, k: int = 3) -> None:
    replaced = False
    for i, old in enumerate(candidates):
        if old['title'] == new_candidate['title']:
            if new_candidate['rank'] < old['rank']:
                candidates[i] = new_candidate
            else:
                if not old.get('language') and new_candidate.get('language'):
                    old['language'] = new_candidate['language']
                if not old.get('region') and new_candidate.get('region'):
                    old['region'] = new_candidate['region']
            replaced = True
            break

    if not replaced:
        candidates.append(new_candidate)

    candidates.sort(key=lambda x: x['rank'])
    if len(candidates) > k:
        del candidates[k:]


SCRIPT_RANGES = {
    'te': [(0x0C00, 0x0C7F)],
    'ta': [(0x0B80, 0x0BFF)],
    'hi': [(0x0900, 0x097F)],
    'ko': [(0x1100, 0x11FF), (0x3130, 0x318F), (0xAC00, 0xD7AF)],
    'ja': [(0x3040, 0x309F), (0x30A0, 0x30FF), (0x4E00, 0x9FFF)],
}


def has_script(text: str, ranges: list[tuple[int, int]]) -> bool:
    for ch in text:
        code = ord(ch)
        for lo, hi in ranges:
            if lo <= code <= hi:
                return True
    return False


def detect_script_bucket(text: str) -> str:
    t = clean_value(text)
    if not t:
        return ''
    for lang in ['te', 'ta', 'hi', 'ko', 'ja']:
        if has_script(t, SCRIPT_RANGES[lang]):
            return lang
    return ''



def infer_lang_bucket(
    aka_langs: Set[str],
    aka_regions: Set[str],
    akas_top3: str,
    primary_title: str,
    original_title: str,
    tmdb_original_language: str = '',
) -> str:
    # TMDB original_language â€” most reliable signal
    tmdb_lang = clean_value(tmdb_original_language)
    if tmdb_lang in TARGET_LANG_SET:
        return tmdb_lang

    # AKA language codes
    for lang in ['te', 'ta', 'hi', 'ja', 'ko']:
        if lang in aka_langs:
            return lang

    script_source = ' '.join([clean_value(akas_top3), clean_value(primary_title), clean_value(original_title)])
    script_bucket = detect_script_bucket(script_source)
    if script_bucket:
        return script_bucket

    if 'JP' in aka_regions:
        return 'ja'
    if 'KR' in aka_regions:
        return 'ko'
    if 'IN' in aka_regions:
        return 'other_non_en'

    if tmdb_lang and tmdb_lang != 'en':
        return 'other_non_en'

    if 'en' in aka_langs:
        return 'en'
    non_empty_langs = {l for l in aka_langs if clean_value(l)}
    if non_empty_langs:
        return 'other_non_en'

    if clean_value(primary_title) or clean_value(original_title):
        merged = clean_value(primary_title) + clean_value(original_title)
        try:
            merged.encode('ascii')
            return 'en'
        except UnicodeEncodeError:
            return 'unknown'

    return 'unknown'


def infer_origin_lang_bucket(
    original_title_langs: Set[str],
    aka_regions: Set[str],
    primary_title: str,
    original_title: str,
    tmdb_original_language: str = '',
) -> str:
    tmdb_lang = clean_value(tmdb_original_language)
    if tmdb_lang in TARGET_LANG_SET:
        return tmdb_lang
    for lang in ['te', 'ta', 'hi', 'ja', 'ko', 'en']:
        if lang in original_title_langs:
            return lang

    non_empty_original = {l for l in original_title_langs if clean_value(l)}
    if non_empty_original:
        return 'other_non_en'
    if tmdb_lang and tmdb_lang != 'en':
        return 'other_non_en'

    script_source = ' '.join([clean_value(original_title), clean_value(primary_title)])
    script_bucket = detect_script_bucket(script_source)
    if script_bucket:
        return script_bucket

    if 'JP' in aka_regions:
        return 'ja'
    if 'KR' in aka_regions:
        return 'ko'
    if 'IN' in aka_regions:
        return 'other_non_en'

    if clean_value(primary_title) or clean_value(original_title):
        merged = clean_value(primary_title) + clean_value(original_title)
        try:
            merged.encode('ascii')
            return 'en'
        except UnicodeEncodeError:
            return 'unknown'

    return 'unknown'


def movie_doc_from_row(row: pd.Series) -> str:
    title = clean_value(row.get('primaryTitle'))
    original = clean_value(row.get('originalTitle'))
    akas_top3 = clean_value(row.get('akas_top3'))
    year = clean_value(row.get('startYear'))
    runtime = clean_value(row.get('runtimeMinutes'))
    genres = clean_value(row.get('genres'))
    rating = row.get('averageRating')
    votes = row.get('numVotes')
    overview = clean_value(row.get('tmdb_overview'))

    parts = []
    parts.append(f'Title: {title if title else "Unknown"}')
    if original and original != title:
        parts.append(f'Original Title: {original}')
    if akas_top3:
        parts.append(f'AKA: {akas_top3}')
    if year:
        parts.append(f'Year: {year}')
    if runtime:
        parts.append(f'Runtime Minutes: {runtime}')
    if genres:
        parts.append(f'Genres: {genres}')

    rating_str = '' if pd.isna(rating) else f'{float(rating):.1f}'
    votes_str = '' if pd.isna(votes) else str(int(votes))
    if rating_str or votes_str:
        parts.append(f'IMDb Rating: {rating_str if rating_str else "NA"} (Votes: {votes_str if votes_str else "NA"})')

    if overview:
        parts.append(f'Plot: {overview}')
    else:
        fallback_plot = (
            'Plot: Metadata-only profile. '
            f'No external summary found. Genres: {genres if genres else "unknown"}; Year: {year if year else "unknown"}.'
        )
        parts.append(fallback_plot)

    return '\n'.join(parts)


## Load Movie Basics + Ratings + TMDB Enrichment


In [None]:
prep_started = time.time()

basic_cols = [
    'tconst',
    'titleType',
    'primaryTitle',
    'originalTitle',
    'isAdult',
    'startYear',
    'runtimeMinutes',
    'genres',
]

movie_parts = []
for chunk in pd.read_csv(
    BASICS_PATH,
    sep='\t',
    dtype=str,
    usecols=basic_cols,
    chunksize=CHUNKSIZE_BASICS,
    na_filter=False,
):
    movie_chunk = chunk.loc[chunk['titleType'] == 'movie', basic_cols].copy()
    if not movie_chunk.empty:
        movie_parts.append(movie_chunk)

movies = pd.concat(movie_parts, ignore_index=True)
movies = movies.sort_values('tconst').reset_index(drop=True)

for col in ['primaryTitle', 'originalTitle', 'genres']:
    movies[col] = movies[col].map(clean_value)

movies['isAdult'] = movies['isAdult'].map(clean_int).fillna(0).astype('int64')
movies['startYear'] = movies['startYear'].map(clean_int).astype('Int64')
movies['runtimeMinutes'] = movies['runtimeMinutes'].map(clean_int).astype('Int64')

ratings = pd.read_csv(RATINGS_PATH, sep='\t', dtype=str, na_filter=False)
ratings['averageRating'] = ratings['averageRating'].map(clean_float)
ratings['numVotes'] = ratings['numVotes'].map(clean_int).astype('Int64')
ratings = ratings[['tconst', 'averageRating', 'numVotes']]

movies = movies.merge(ratings, on='tconst', how='left')


tmdb_cols = ['id', 'imdb_id', 'title', 'overview', 'original_language',
             'vote_count', 'popularity', 'release_date']
tmdb = pd.read_csv(TMDB_PATH, usecols=tmdb_cols, low_memory=False)
tmdb['imdb_id'] = tmdb['imdb_id'].fillna('').astype(str).str.strip()
tmdb = tmdb[tmdb['imdb_id'].str.match(r'^tt\d+$', na=False)].copy()
tmdb['overview'] = tmdb['overview'].fillna('').astype(str).str.strip()
tmdb['has_overview'] = (tmdb['overview'].str.len() >= 20).astype(int)
tmdb['vote_count'] = pd.to_numeric(tmdb['vote_count'], errors='coerce').fillna(0)
tmdb['popularity'] = pd.to_numeric(tmdb['popularity'], errors='coerce').fillna(0)
tmdb['release_date'] = tmdb['release_date'].fillna('').astype(str)
tmdb['id'] = pd.to_numeric(tmdb['id'], errors='coerce').fillna(0)
tmdb['original_language'] = tmdb['original_language'].fillna('').astype(str).str.strip()

tmdb = tmdb.sort_values(
    ['has_overview', 'vote_count', 'popularity', 'release_date', 'id'],
    ascending=[False, False, False, False, False],
)
tmdb_best = tmdb.drop_duplicates(subset=['imdb_id'], keep='first').copy()
tmdb_best = tmdb_best.rename(
    columns={
        'imdb_id': 'tconst',
        'id': 'tmdb_id',
        'title': 'tmdb_title',
        'overview': 'tmdb_overview',
        'original_language': 'tmdb_original_language',   # NEW
    }
)
tmdb_best = tmdb_best[['tconst', 'tmdb_id', 'tmdb_title', 'tmdb_overview', 'tmdb_original_language']]

movies = movies.merge(tmdb_best, on='tconst', how='left')
movies['tmdb_overview'] = movies['tmdb_overview'].fillna('').astype(str).str.strip()
movies['tmdb_original_language'] = movies['tmdb_original_language'].fillna('').astype(str).str.strip()
movies['has_tmdb_plot'] = (movies['tmdb_overview'].str.len() >= 20)

print('Movies loaded:', len(movies))
print('Movies with TMDB overview:', int(movies['has_tmdb_plot'].sum()))
print('TMDB overview coverage:', round(float(movies['has_tmdb_plot'].mean()), 4))
print('Movies with TMDB original_language:', int((movies['tmdb_original_language'] != '').sum()))


Movies loaded: 737654
Movies with TMDB overview: 301717
TMDB overview coverage: 0.409
Movies with TMDB original_language: 351167


## Process AKA Titles (Top-3) + Infer Language Bucket + Build `movieDoc`


In [None]:
movie_set = set(movies['tconst'].tolist())

aka_top_map: Dict[str, List[dict]] = defaultdict(list)
aka_langs_map: Dict[str, Set[str]] = defaultdict(set)
aka_regions_map: Dict[str, Set[str]] = defaultdict(set)
aka_original_langs_map: Dict[str, Set[str]] = defaultdict(set)

akas_cols = ['titleId', 'ordering', 'title', 'region', 'language', 'types', 'isOriginalTitle']
rows_seen = 0
rows_matched_movies = 0

for chunk in pd.read_csv(
    AKAS_PATH,
    sep='	',
    dtype=str,
    usecols=akas_cols,
    chunksize=CHUNKSIZE_AKAS,
    na_filter=False,
):
    rows_seen += len(chunk)
    chunk = chunk[chunk['titleId'].isin(movie_set)]
    rows_matched_movies += len(chunk)
    if chunk.empty:
        continue

    for row in chunk.itertuples(index=False):
        tconst = clean_value(row.titleId)
        title = clean_value(row.title)
        if not tconst or not title:
            continue

        language = clean_value(row.language)
        region = clean_value(row.region)
        is_original = clean_value(row.isOriginalTitle) == '1'

        if language:
            aka_langs_map[tconst].add(language)
            if is_original:
                aka_original_langs_map[tconst].add(language)
        if region:
            aka_regions_map[tconst].add(region)

        candidate = {
            'rank': rank_tuple(row.isOriginalTitle, row.types, row.ordering, title),
            'title': title,
            'language': language,
            'region': region,
        }
        update_top_k_candidates(aka_top_map[tconst], candidate, k=TOP_AKAS)

print('AKA rows seen:', rows_seen)
print('AKA rows matched to movies:', rows_matched_movies)
print('Movies with at least one AKA row:', len(aka_top_map))

akas_top3_map = {}
akas_count_map = {}
for tconst, candidates in aka_top_map.items():
    titles = []
    seen_titles = set()
    for item in sorted(candidates, key=lambda x: x['rank']):
        if item['title'] not in seen_titles:
            titles.append(item['title'])
            seen_titles.add(item['title'])
    akas_top3_map[tconst] = ' | '.join(titles)
    akas_count_map[tconst] = len(titles)

movies['akas_top3'] = movies['tconst'].map(akas_top3_map).fillna('')
movies['akas_top3_count'] = movies['tconst'].map(akas_count_map).fillna(0).astype('int64')
movies['aka_langs'] = movies['tconst'].map(aka_langs_map).apply(lambda x: x if isinstance(x, set) else set())
movies['aka_regions'] = movies['tconst'].map(aka_regions_map).apply(lambda x: x if isinstance(x, set) else set())
movies['aka_original_langs'] = movies['tconst'].map(aka_original_langs_map).apply(lambda x: x if isinstance(x, set) else set())

movies['lang_bucket'] = movies.apply(
    lambda r: infer_lang_bucket(
        aka_langs=r['aka_langs'],
        aka_regions=r['aka_regions'],
        akas_top3=r['akas_top3'],
        primary_title=r['primaryTitle'],
        original_title=r['originalTitle'],
        tmdb_original_language=r.get('tmdb_original_language', ''),
    ),
    axis=1,
)

movies['origin_lang_bucket'] = movies.apply(
    lambda r: infer_origin_lang_bucket(
        original_title_langs=r['aka_original_langs'],
        aka_regions=r['aka_regions'],
        primary_title=r['primaryTitle'],
        original_title=r['originalTitle'],
        tmdb_original_language=r.get('tmdb_original_language', ''),
    ),
    axis=1,
)

movies['movieDoc'] = movies.apply(movie_doc_from_row, axis=1)

movies = movies.sort_values('tconst').reset_index(drop=True)
movies['row_id'] = movies.index.astype('int64')

movies['aka_langs_codes'] = movies['aka_langs'].apply(lambda s: ','.join(sorted(s)) if s else '')
movies['aka_regions_codes'] = movies['aka_regions'].apply(lambda s: ','.join(sorted(s)) if s else '')
movies['aka_original_langs_codes'] = movies['aka_original_langs'].apply(lambda s: ','.join(sorted(s)) if s else '')

catalog_cols = [
    'row_id',
    'tconst',
    'titleType',
    'primaryTitle',
    'originalTitle',
    'startYear',
    'runtimeMinutes',
    'genres',
    'isAdult',
    'averageRating',
    'numVotes',
    'tmdb_overview',
    'has_tmdb_plot',
    'akas_top3',
    'lang_bucket',
    'origin_lang_bucket',
    'tmdb_original_language',
    'movieDoc',
]

meta_cols = [
    'row_id',
    'tconst',
    'primaryTitle',
    'startYear',
    'genres',
    'averageRating',
    'numVotes',
    'lang_bucket',
    'origin_lang_bucket',
    'tmdb_original_language',
    'has_tmdb_plot',
]

catalog = movies[catalog_cols].copy()
meta = movies[meta_cols].copy()

catalog.to_csv(CATALOG_PATH, index=False)
meta.to_csv(META_PATH, index=False)

lang_counts = Counter(catalog['lang_bucket'])
origin_lang_counts = Counter(catalog['origin_lang_bucket'])
prep_seconds = time.time() - prep_started

manifest = {}
if MANIFEST_PATH.exists():
    manifest = json.loads(MANIFEST_PATH.read_text(encoding='utf-8'))

manifest['prep'] = {
    'timestamp_utc': pd.Timestamp.utcnow().isoformat(),
    'movie_rows': int(len(catalog)),
    'tmdb_plot_rows': int(catalog['has_tmdb_plot'].sum()),
    'tmdb_plot_coverage': float(catalog['has_tmdb_plot'].mean()),
    'lang_bucket_counts': {k: int(v) for k, v in lang_counts.items()},
    'origin_lang_bucket_counts': {k: int(v) for k, v in origin_lang_counts.items()},
    'debias_bucket_column': 'origin_lang_bucket',
    'inputs': {
        'title_basics': str(BASICS_PATH),
        'title_ratings': str(RATINGS_PATH),
        'title_akas': str(AKAS_PATH),
        'tmdb_csv': str(TMDB_PATH),
    },
    'outputs': {
        'catalog_csv': str(CATALOG_PATH),
        'meta_csv': str(META_PATH),
    },
    'chunk_sizes': {
        'basics': CHUNKSIZE_BASICS,
        'akas': CHUNKSIZE_AKAS,
    },
    'duration_seconds': round(prep_seconds, 2),
}

MANIFEST_PATH.write_text(json.dumps(manifest, indent=2), encoding='utf-8')

print('Saved catalog:', CATALOG_PATH)
print('Saved meta:', META_PATH)
print('Updated manifest:', MANIFEST_PATH)
print('Top availability language buckets:', dict(lang_counts.most_common(10)))
print('Top origin language buckets:', dict(origin_lang_counts.most_common(10)))


AKA rows seen: 54958521
AKA rows matched to movies: 3674721
Movies with at least one AKA row: 736117
Saved catalog: /content/drive/MyDrive/cinematch/outputs/imdb/imdb_movies_catalog.csv
Saved meta: /content/drive/MyDrive/cinematch/outputs/imdb/imdb_movies_meta.csv
Updated manifest: /content/drive/MyDrive/cinematch/outputs/imdb/imdb_movies_build_manifest.json
Top availability language buckets: {'en': 393262, 'other_non_en': 191827, 'ja': 92256, 'hi': 24205, 'unknown': 20712, 'ko': 8944, 'ta': 3709, 'te': 2739}
Top origin language buckets: {'en': 405045, 'other_non_en': 210600, 'ja': 76366, 'unknown': 24527, 'ko': 8825, 'hi': 6495, 'ta': 3386, 'te': 2410}


## Acceptance Checks


In [None]:
assert len(catalog) == len(movies), 'Catalog row count mismatch.'

assert catalog['row_id'].is_unique, 'row_id must be unique.'
assert int(catalog['row_id'].min()) == 0, 'row_id must start at 0.'
assert int(catalog['row_id'].max()) == len(catalog) - 1, 'row_id must be contiguous.'

assert not catalog['tconst'].duplicated().any(), 'Duplicate tconst found in catalog.'

max_akas = int(movies['akas_top3_count'].max())
assert max_akas <= TOP_AKAS, 'More than TOP_AKAS found in akas_top3.'

summary = {
    'catalog_rows': int(len(catalog)),
    'meta_rows': int(len(meta)),
    'tmdb_plot_rows': int(catalog['has_tmdb_plot'].sum()),
    'max_akas_per_movie': int(max_akas),
}
summary


{'catalog_rows': 737654,
 'meta_rows': 737654,
 'tmdb_plot_rows': 301717,
 'max_akas_per_movie': 3}