# Ingesting [tmdb](https://www.kaggle.com/datasets/tmdb/tmdb-movie-metadata) movie database into aperturedb.

This notebook will work on an instance of ApertureDB, which can be on the [cloud](https://cloud.aperturedata.io), or running as a [local docker container(s)](https://docs.aperturedata.io/Setup/server/Local)

The dataset is hosted on kaggle, and available via a mlcroissant link.


In [None]:
%pip install --quiet mlcroissant pandas dotenv

## Import all the modules needed

In [None]:
import json
from typing import List

import mlcroissant as mlc
import pandas as pd

from tqdm import tqdm

from aperturedb.Subscriptable import Subscriptable
from aperturedb.ParallelLoader import ParallelLoader
from aperturedb.Query import QueryBuilder
from aperturedb.CommonLibrary import (
    create_connector
)
from aperturedb.Utils import Utils


## Load croissant records into dataframes

In [None]:
# Fetch the Croissant JSON-LD
croissant_dataset = mlc.Dataset('https://www.kaggle.com/datasets/tmdb/tmdb-movie-metadata/croissant/download')

# Check what record sets are in the dataset
record_sets = croissant_dataset.metadata.record_sets
print(f"{record_sets=}")

def deserialize_record(record):
    deserialized = record.decode('utf-8') if isinstance(record, bytes) else record
    if isinstance(deserialized, str):
        try:
            deserialized = json.loads(deserialized)
        except:
            pass
    return deserialized

# Fetch the records and put them in a DataFrame. The archive, downloads, load into a DataFrame
# is managed by the croissant library.
# croisant recrds are ~ DataFrame. TMDB has 2 record sets
# The first records are the movies, the second are the casts.
# The association between the two is the movie_id
record_set_df_0 = pd.DataFrame(croissant_dataset.records(record_set=record_sets[0].uuid))
record_set_df_1 = pd.DataFrame(croissant_dataset.records(record_set=record_sets[1].uuid))

# Display the first few records from each record set
for record_set in record_sets:
    record_set_df = pd.DataFrame(croissant_dataset.records(record_set=record_set.uuid))
    columns = record_set_df.columns
    count = 0
    for record in record_set_df.iterrows():
        j = {}
        for c in columns:
            j[c] = deserialize_record(record[1][c])
        count += 1
        print(json.dumps(j, indent=2, default=str))

        if count == 5:
            break


In [None]:
client=create_connector()
utils = Utils(client)
utils.remove_all_objects()
utils.summary()


In [None]:

def make_movie(j: dict) -> List[dict]:
    """
    This is where we create the Commands to create Movie and Professional objects
    and the HasCast connection between them.
    The movie is the root object, and the cast are the children.
    Each call to this function creates a transaction that will be executed in the database.

    Args:
        j (dict): a record from the dataset. The record is a dictionary with the following keys:

    Returns:
        List[dict]: A list of commands to be executed in the database.
    """
    transaction = []
    movie_parameters = dict(_ref=1, properties=dict(
        id=str(j["tmdb_5000_credits.csv/movie_id"]),
        movie_id=j["tmdb_5000_credits.csv/movie_id"],
        title=str(j["tmdb_5000_credits.csv/title"]),
        budget=j["tmdb_5000_movies.csv/budget"],
        overview=str(j["tmdb_5000_movies.csv/overview"]),
        popularity=j["tmdb_5000_movies.csv/popularity"]
    ), if_not_found=dict(id=["==", str(j["tmdb_5000_credits.csv/movie_id"])]))

    movie = QueryBuilder.add_command("MOVIE", movie_parameters)
    transaction.append(movie)

    index = 2
    for cast_info in j["tmdb_5000_credits.csv/cast"]:
        c = cast_info
        cast_parameters = dict(_ref=index, properties=dict(
            id=c["id"],
            name=c["name"],
            gender=c["gender"]), if_not_found=dict(id=["==", c["id"]]))
        professional = QueryBuilder.add_command("PROFESSIONAL", cast_parameters)
        transaction.append(professional)

        connection_parameters = dict(src=1, dst=index, properties=dict(
            character=c["character"],
            cast_id=c["cast_id"])
        )
        connection_parameters["class"] = "CAST"
        connection = QueryBuilder.add_command("_Connection", connection_parameters)
        transaction.append(connection)
        index += 1

    for crew_info in j["tmdb_5000_credits.csv/crew"]:
        c = crew_info
        crew_parameters = dict(_ref=index, properties=dict(
            id=c["id"],
            name=c["name"],
            gender=c["gender"]
        ), if_not_found=dict(id=["==", c["id"]]))
        professional = QueryBuilder.add_command("PROFESSIONAL", crew_parameters)
        transaction.append(professional)

        connection_parameters = dict(src=1, dst=index, properties=dict(
            department=c["department"],
            job=c["job"],
            credit_id=c["credit_id"]))
        connection = QueryBuilder.add_command("_Connection", connection_parameters)
        connection_parameters["class"] = "CREW"
        transaction.append(connection)
        index += 1

    return transaction


In [None]:
def make_movie_2(j: dict) -> List[dict]:
    """
    This is where we create the Commands to create Movie and Professional objects
    and the HasCast connection between them.
    The movie is the root object, and the cast are the children.
    Each call to this function creates a transaction that will be executed in the database.

    Args:
        j (dict): a record from the dataset. The record is a dictionary with the following keys:

    Returns:
        List[dict]: A list of commands to be executed in the database.
    """
    transaction = []
    movie_parameters = dict(_ref=1, properties=dict(
        id=str(j["tmdb_5000_credits.csv/movie_id"]),
        movie_id=j["tmdb_5000_credits.csv/movie_id"],
        title=str(j["tmdb_5000_credits.csv/title"]),
        budget=j["tmdb_5000_movies.csv/budget"],
        overview=str(j["tmdb_5000_movies.csv/overview"]),
        popularity=j["tmdb_5000_movies.csv/popularity"],
        tagline=str(j["tmdb_5000_movies.csv/tagline"]),
        vote_average=j["tmdb_5000_movies.csv/vote_average"],
        vote_count=j["tmdb_5000_movies.csv/vote_count"],
        ## Adding this as there is not way to get the class from entities in ADB.
        label="MOVIE",
        name=str(j["tmdb_5000_credits.csv/title"]).capitalize()
    ), if_not_found=dict(id=["==", str(j["tmdb_5000_credits.csv/movie_id"])]))

    movie = QueryBuilder.add_command("MOVIE", movie_parameters)
    transaction.append(movie)

    index = 2
    for cast_info in j["tmdb_5000_credits.csv/cast"]:
        c = cast_info
        cast_parameters = dict(_ref=index, properties=dict(
            id=c["id"],
            name=c["name"].capitalize(),
            gender=c["gender"],
            label="PROFESSIONAL"
        ), if_not_found=dict(id=["==", c["id"]]))
        professional = QueryBuilder.add_command("PROFESSIONAL", cast_parameters)
        transaction.append(professional)

        connection_parameters = dict(src=1, dst=index, properties=dict(
            character=c["character"],
            cast_id=c["cast_id"],
            name="CAST"
            )
        )
        connection_parameters["class"] = "CAST"
        connection = QueryBuilder.add_command("_Connection", connection_parameters)
        transaction.append(connection)
        index += 1

    for crew_info in j["tmdb_5000_credits.csv/crew"]:
        c = crew_info
        crew_parameters = dict(_ref=index, properties=dict(
            id=c["id"],
            name=c["name"].capitalize(),
            gender=c["gender"],
            label="PROFESSIONAL"
        ), if_not_found=dict(id=["==", c["id"]]))
        professional = QueryBuilder.add_command("PROFESSIONAL", crew_parameters)
        transaction.append(professional)

        connection_parameters = dict(src=1, dst=index, properties=dict(
            department=c["department"],
            job=c["job"],
            credit_id=c["credit_id"],
            name="CREW"
            )
        )
        connection = QueryBuilder.add_command("_Connection", connection_parameters)
        connection_parameters["class"] = "CREW"
        transaction.append(connection)
        index += 1

    for genre in j["tmdb_5000_movies.csv/genres"]:
        genre_parameters = dict(_ref=index, properties=dict(
            id=genre["id"],
            name=genre["name"],
            label="GENRE"
        ), if_not_found=dict(id=["==", genre["id"]]))
        genre_command = QueryBuilder.add_command("GENRE", genre_parameters)
        transaction.append(genre_command)

        connection_parameters = dict(src=1, dst=index, properties=dict(
            name="HAS_GENRE"
        ))
        connection_parameters["class"] = "HAS_GENRE"
        connection = QueryBuilder.add_command("_Connection", connection_parameters)
        transaction.append(connection)
        index += 1

    for production_company in j["tmdb_5000_movies.csv/production_companies"]:
        company_parameters = dict(_ref=index, properties=dict(
            id=production_company["id"],
            name=production_company["name"],
            label="PRODUCTION_COMPANY"
        ), if_not_found=dict(id=["==", production_company["id"]]))
        company_command = QueryBuilder.add_command("PRODUCTION_COMPANY", company_parameters)
        transaction.append(company_command)

        connection_parameters = dict(src=1, dst=index, properties=dict(
            name="HAS_PRODUCTION_COMPANY"
        ))
        connection_parameters["class"] = "HAS_PRODUCTION_COMPANY"
        connection = QueryBuilder.add_command("_Connection", connection_parameters)
        transaction.append(connection)
        index += 1

    for keyword in j["tmdb_5000_movies.csv/keywords"]:
        keyword_parameters = dict(_ref=index, properties=dict(
            id=keyword["id"],
            name=keyword["name"],
            label="KEYWORD"
        ), if_not_found=dict(id=["==", keyword["id"]]))
        keyword_command = QueryBuilder.add_command("KEYWORD", keyword_parameters)
        transaction.append(keyword_command)

        connection_parameters = dict(src=1, dst=index, properties=dict(
            name="HAS_KEYWORD"
        ))
        connection_parameters["class"] = "HAS_KEYWORD"
        connection = QueryBuilder.add_command("_Connection", connection_parameters)
        transaction.append(connection)
        index += 1

    for spoken_language in j["tmdb_5000_movies.csv/spoken_languages"]:
        language_parameters = dict(_ref=index, properties=dict(
            iso_639_1=spoken_language["iso_639_1"],
            name=spoken_language["name"],
            label="SPOKEN_LANGUAGE"
        ), if_not_found=dict(iso_639_1=["==", spoken_language["iso_639_1"]]))
        language_command = QueryBuilder.add_command("SPOKEN_LANGUAGE", language_parameters)
        transaction.append(language_command)

        connection_parameters = dict(src=1, dst=index, properties=dict(
            name="HAS_SPOKEN_LANGUAGE"
        ))
        connection_parameters["class"] = "HAS_SPOKEN_LANGUAGE"
        connection = QueryBuilder.add_command("_Connection", connection_parameters)
        transaction.append(connection)
        index += 1

    return transaction

In [None]:

# Merge the two DataFrames on the movie_id
records = record_set_df_0.merge(
    record_set_df_1,
    right_on="tmdb_5000_movies.csv/id",
    left_on="tmdb_5000_credits.csv/movie_id")

collection = []
for record in tqdm(records.iterrows()):
    columns = records.columns
    count = 0
    j = {}
    for c in columns:
        j[c] = deserialize_record(record[1][c])
    count += 1
    movie = make_movie_2(j)
    collection.append(movie)


class MovieParser(Subscriptable):
    def __init__(self, collection):
        self.collection = collection
    def getitem(self, key):
        query = self.collection[key]
        return query, []
    def __len__(self):
        return len(self.collection)


utils.create_entity_index("MOVIE", "id")
utils.create_entity_index("PROFESSIONAL", "id")
utils.create_entity_index("GENRE", "id")
utils.create_entity_index("PRODUCTION_COMPANY", "id")
utils.create_entity_index("KEYWORD", "id")
utils.create_entity_index("SPOKEN_LANGUAGE", "iso_639_1")


utils.create_connection_index("CAST", "cast_id")
utils.create_connection_index("CREW", "crew_id")

parser = MovieParser(collection)
loader = ParallelLoader(client)
ParallelLoader.setSuccessStatus([0, 2])
loader.ingest(parser, batchsize=100, numthreads=8, stats=True)
