In [None]:
# Get list of movie IDs from the movieDB dataset
# Use todays_date formatted as mm_dd_YYYY
import concurrent.futures
import gzip
import os
import shutil
import sys
import time
from datetime import datetime, timedelta

import jsonlines
import requests
from dotenv import load_dotenv
from tqdm.notebook import tqdm

from game.movie import Movie
from game.person import Person

load_dotenv()
# get Authorization Bearer token from .env file
TMDB_BEARER_TOKEN = os.getenv("TMDB_BEARER_TOKEN")

TODAY_DATE = datetime.today().strftime("%m_%d_%Y")
TMDB_HEADERS = {
    "accept": "application/json",
    "Authorization": f"Bearer {TMDB_BEARER_TOKEN}",
}
MOVIE_DB_SIZE = 15_000
ACTORS_PER_MOVIE = 10

In [None]:
# Directory to store movie ID files
movie_ids_directory = "./movie_ids/"
os.makedirs(movie_ids_directory, exist_ok=True)

# File path for storing movie IDs
old_movie_ids_filenames: list[str] = os.listdir(movie_ids_directory)
latest_movie_ids_path: str = os.path.join(movie_ids_directory, sorted(old_movie_ids_filenames)[-1])

# Check if the file exists and is older than 14 days
if not os.path.exists(latest_movie_ids_path) or (
    datetime.now() - datetime.fromtimestamp(os.path.getmtime(latest_movie_ids_path))
) > timedelta(days=14):
    # Download movie IDs
    movie_ids_url = f"http://files.tmdb.org/p/exports/movie_ids_{TODAY_DATE}.json.gz"
    latest_movie_ids_path: str = os.path.join(movie_ids_directory, f"movie_ids_{TODAY_DATE}.json")
    r = requests.get(movie_ids_url)

    # Save the gzipped file
    with open(f"movie_ids_{TODAY_DATE}.json.gz", "wb") as code:
        code.write(r.content)

    # Unzip the file and save the JSON
    with gzip.open(f"movie_ids_{TODAY_DATE}.json.gz", "rb") as f_in, open(
        latest_movie_ids_path, "wb"
    ) as f_out:
        shutil.copyfileobj(f_in, f_out)

    # Remove the gz file
    os.remove(f"movie_ids_{TODAY_DATE}.json.gz")
    print("Movie IDs updated.")
    # if updated, get the difference in movie ids between the old and new files
    # load the latest movie ids
    with jsonlines.open(latest_movie_ids_path) as reader:
        latest_movie_ids_set: set[str] = {movie["id"] for movie in reader}
    # load the old movie ids
    with jsonlines.open(f"./movie_ids/{sorted(old_movie_ids_filenames)[-1]}") as reader:
        old_movie_ids_set: set[str] = {movie["id"] for movie in reader}
    new_movie_count: int = len(latest_movie_ids_set) - len(old_movie_ids_set)
    print(f"{new_movie_count} new movie IDs found.")
    del latest_movie_ids_set, old_movie_ids_set, new_movie_count
else:
    print("Movie IDs are up to date.")

In [None]:
# small analysis using pandas
import pandas as pd

df = pd.read_json(latest_movie_ids_path, lines=True, orient="records")

In [None]:
df_filtered = df.sort_values(by="popularity", ascending=False).head(MOVIE_DB_SIZE)

In [None]:
url = "https://api.themoviedb.org/3/authentication"

response = requests.get(url, headers=TMDB_HEADERS)

print(response.text)

In [None]:
# for every movie ID in the filtered dataframe, get the 10 main actors, the director, the writer, the cinematographer and the composer (if they exist), their popularity, their id, their known_for_department (for cast list), their name and their job (for crew list)

movies = [
    {"id": x.id, "title": x.original_title, "popularity": x.popularity}
    for x in df_filtered.itertuples()
]

In [None]:
def fetch_movie_release_date(movie) -> None:
    """
    Fretches the release date for a movie from the TMDB API and adds it to the movie dict.
    Endpoint: https://api.themoviedb.org/3/movie/{movie_id}

    Args:
        movie (dict): A movie dict from the movies list
    """
    # test if not already queried the info
    if "release_date" in movie:
        return

    url = f"https://api.themoviedb.org/3/movie/{movie['id']}"

    response = requests.get(url, headers=TMDB_HEADERS)

    # if status is 429, wait for 10 seconds and try again. If fails again, exit
    if response.status_code == 429:
        time.sleep(10)
        response = requests.get(url, headers=TMDB_HEADERS)
        if response.status_code == 429:
            print("Too many requests. Exiting.")
            sys.exit()

    # test if status is 200
    if response.status_code != 200:
        print(f"Error for movie {movie['id']}: {response.status_code}")
        return

    response_json = response.json()
    movie["release_date"] = response_json["release_date"]


def fetch_movie_credits(movie) -> None:
    """
    Fetches the cast and crew for a movie from the TMDB API and adds them to the movie dict.
    Credits endpoint: https://api.themoviedb.org/3/movie/{movie_id}/credits

    Args:
        movie (dict): A movie dict from the movies list
    """
    # test if not already queried the info
    if "cast" in movie and "crew" in movie:
        return

    url = f"https://api.themoviedb.org/3/movie/{movie['id']}/credits"

    response = requests.get(url, headers=TMDB_HEADERS)

    # if status is 429, wait for 10 seconds and try again. If fails again, exit
    if response.status_code == 429:
        time.sleep(10)
        response = requests.get(url, headers=TMDB_HEADERS)
        if response.status_code == 429:
            print("Too many requests. Exiting.")
            sys.exit()

    # test if status is 200
    if response.status_code != 200:
        print(f"Error for movie {movie['id']}: {response.status_code}")
        return

    response_json = response.json()
    cast = response_json["cast"]
    crew = response_json["crew"]

    # select attributes for cast, only 10 first actors
    movie["cast"] = [
        {
            "id": x["id"],
            "name": x["name"],
            "popularity": x["popularity"],
            "known_for_department": x["known_for_department"],
        }
        for x in cast[:ACTORS_PER_MOVIE]
    ]

    # select attributes for crew
    movie["crew"] = [
        {
            "id": x["id"],
            "name": x["name"],
            "popularity": x["popularity"],
            "known_for_department": x["known_for_department"],
            "job": x["job"],
        }
        for x in crew
        if x["job"]
        in [
            "Director",
            "Writer",
            "Director of Photography",
            "Original Music Composer",
        ]
    ]


def create_movie_dict(movie) -> None:
    """
    Creates a movie dict with the attributes we want to keep.

    Args:
        movie (dict): A movie dict from the movies list

    Returns:
        dict: A movie dict with the attributes we want to keep
    """
    fetch_movie_release_date(movie)
    fetch_movie_credits(movie)

In [None]:
# Separate movies into 5 parts
num_threads = 10
chunk_size = len(movies) // num_threads
movie_chunks = [movies[i : i + chunk_size] for i in range(0, len(movies), chunk_size)]


# Function to process a chunk of movies
def process_chunk(chunk):
    for movie in tqdm(chunk):
        create_movie_dict(movie)


# Use ThreadPoolExecutor for parallel processing
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
    # Submit the chunks for processing
    futures = [executor.submit(process_chunk, chunk) for chunk in movie_chunks]

    # Wait for all threads to finish
    concurrent.futures.wait(futures)

Takes about 15minutes for 20k movies (subject to rate limit)


In [None]:
# merge movie_chunks into a list, with one item per movie
movies_dicts = [movie for chunk in movie_chunks for movie in chunk]
len(movies_dicts)

In [None]:
# save the movies list as a json lines file
with jsonlines.open("movies.jsonl", mode="w") as writer:
    writer.write_all(movies_dicts)

# Insert into DB


In [None]:
# read the json lines file if movies_dicts doesn't exist
if "movies_dicts" not in locals():
    with jsonlines.open("movies.jsonl") as reader:
        movies_dicts = list(reader)

In [None]:
# insert all the movies into the Cypher database.
# Should contain the following nodes:
# - Movie (id, title, popularity)
# - Person (id, name, popularity, known_for_department)

# Should contain the following relationships:
# - FEATURED_IN (Person)-[:FEATURED_IN]->(Movie) (whether as cast or crew)

from game.neo4j_utils.neo4j_utils import GraphDbConnector

graph = GraphDbConnector(
    "bolt://localhost:7687", database="neo4j", user="neo4j", password="password"
)

In [None]:
# create constraints
try:
    graph.initialize_database()
except Exception as e:
    print(e)

In [None]:
# Insert all the movies and people into the database

# create a dictionary of all the people in the movies
people = {}
for movie in movies_dicts:
    try:
        for person in movie["cast"] + movie["crew"]:
            if person["id"] not in people:
                people[person["id"]] = person
    except Exception:
        print(movie)
        # remove the movie from the list
        movies_dicts.remove(movie)

In [None]:
for movie in tqdm(movies_dicts):  # 30 secs for 15k movies
    release_year = movie["release_date"].split("-")[0]
    try:
        graph.insert_movie(Movie(movie["id"], movie["title"], movie["popularity"], release_year))
    except Exception:
        print(movie)

for person in tqdm(people.values()):  # 75k people, about 2 minutes
    graph.insert_person(Person(person["id"], person["name"], person["popularity"]))

In [None]:
# Insert all the relationships into the database

# Parralelize code:
# Separate movies into 20 parts, takes about 3mins

num_threads = 20

chunk_size = len(movies_dicts) // num_threads

movie_chunks = [movies_dicts[i : i + chunk_size] for i in range(0, len(movies_dicts), chunk_size)]


# Function to process a chunk of movies
def process_chunk(chunk):
    for movie in tqdm(chunk):
        for person in movie["cast"]:
            graph.insert_person_to_movie(
                Person(person["id"]), Movie(movie["id"]), person["known_for_department"], "actor"
            )
        for person in movie["crew"]:
            graph.insert_person_to_movie(
                Person(person["id"]), Movie(movie["id"]), person["known_for_department"], person["job"]
            )


# Use ThreadPoolExecutor for parallel processing
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
    # Submit the chunks for processing
    futures = [executor.submit(process_chunk, chunk) for chunk in movie_chunks]

    # Wait for all threads to finish
    concurrent.futures.wait(futures)

In [None]:
graph.close()