### Data Crawing

#### 1. Batch Processing Strategy
Since the ID list contains millions of entries, the crawling process is designed for **multi-day execution**. The dataset is partitioned into smaller batches (100,000 IDs per batch) to ensure systematic processing, maintain data integrity, and prevent data loss during unexpected interruptions.

#### 2. Runtime & Resource Management
When utilizing cloud environments like **Kaggle**, it is crucial to account for session time limits (12-hour window). Users should estimate the average processing time per batch and configure an optimal `RUN_LIMIT` to ensure the script terminates gracefully and saves progress before a session timeout occurs.

In [6]:
import os
from dotenv import load_dotenv
import requests
import json
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
from itertools import islice

In [7]:
load_dotenv()

TMDB_API_KEY= os.getenv("TMDB_API_KEY_MOVIE")

INPUT_PATH = "../../data/id_list/movie_ids_12_31_2025.jsonl"    # ids file path
OUTPUT_PATH = "../../data/movie/movie_12_31_2025.jsonl"         # output file path
MAX_WORKERS = 5                                                 # concurrency workers
WRITE_THRESHOLD = 100                                           # number of lines to write to file at once
IDX_RUN = 1                                                     # index of batch (first is 1)
RUN_LIMIT = 100000                                              # number of lines to process in a batch
START_FROM = (IDX_RUN - 1) * RUN_LIMIT                          # index of first line to process in a batch


assert WRITE_THRESHOLD <= RUN_LIMIT, "Error: WRITE_THRESHOD need to be smaller than RUN_LIMIT"

In [8]:
url_movie_detail = f"https://api.themoviedb.org/3/movie"
headers_detail = {
    "Accept": "application/json"
}
params_details = {
    'api_key': TMDB_API_KEY,
    'language': 'en-US',
    'append_to_response': 'videos,images'
}


In [9]:
def get_movie_data(movie_id):
    """Get movie detail, cast and crew data from API by movie id."""

    # Get response from API
    movie_resp = requests.get(f"{url_movie_detail}/{movie_id}", headers=headers_detail, params=params_details)
    credit_resp = requests.get(f"{url_movie_detail}/{movie_id}/credits", headers=headers_detail, params=params_details)
    credit_info =  credit_resp.json()

    # Prepare cast data
    cast_list = []
    if credit_info.get("cast") is None:
        cast_list = []
        print("No cast found")
    else:
        cast_info = credit_info["cast"]
        for info in cast_info:
            cast = {
                "person_id": info.get("id"),
                "known_for_department": info.get("known_for_department"),
                "cast_id": info.get("cast_id"),
                "character": info.get("character"),
                "credit_id": info.get("credit_id"),
            }
            cast_list.append(cast)

    # Prepare crew data
    crew_list = []
    if credit_info.get("crew") is None:
        cast_list = []
        print("No crew found")
    else:
        crew_info = credit_info["crew"]
        for info in crew_info:
            crew = {
                "person_id": info.get("id"),
                "known_for_department": info.get("known_for_department"),
                "department": info.get("department"),
                "job": info.get("job"),
            }
            crew_list.append(crew)

    # Create final data
    final_resp = {
        "movie_id": movie_id,
        "movie_detail": movie_resp.json(),
        "casts_info": cast_list,
        "crews_info": crew_list,
    }

    return final_resp

def count_lines(file_path):
    """Count number of lines in a file."""
    with open(file_path, "rb") as f:
        return sum(1 for _ in f)

In [10]:
total_lines = count_lines(INPUT_PATH)
with open(INPUT_PATH, "r", encoding="utf-8") as f:
    batch_lines = islice(f, START_FROM, min(START_FROM + RUN_LIMIT, total_lines))
    batch_list = [json.loads(line) for line in batch_lines]

pbar = tqdm(total=len(batch_list), desc=f"Seeding Person data, batch = {IDX_RUN}", unit="person")
person_ids = [item["id"] for item in batch_list]
# results = []

temp_results = []
idx_line = START_FROM
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    futures = []

    for person_id in person_ids:
        futures.append(executor.submit(get_movie_data, person_id))

    for future in futures:
        try:
            person_data = future.result()
            if person_data:
                temp_results.append(person_data)
                if len(temp_results) > WRITE_THRESHOLD:
                    with open(OUTPUT_PATH, "a", encoding="utf-8") as f:
                        for res in temp_results:
                            f.write(json.dumps(res, ensure_ascii=False) + "\n")
                    temp_results = []

        except Exception as e:
            pbar.write(f"Error in thread: {e}")
        finally:
            pbar.set_postfix({"idx_in_file": idx_line}, refresh=False)
            pbar.update(1)
            idx_line += 1

if temp_results:
    with open(OUTPUT_PATH, "a", encoding="utf-8") as f_out:
        for res in temp_results:
            f_out.write(json.dumps(res, ensure_ascii=False) + "\n")

pbar.close()
print("Done!")

Seeding Person data, batch = 1:   0%|          | 0/500000 [00:00<?, ?person/s]

No cast found
No crew found


Seeding Person data, batch = 1:   0%|          | 16/500000 [00:07<40:54:44,  3.39person/s, idx_in_file=15]

KeyboardInterrupt: 