In [2]:
import os
import pandas as pd
import numpy as np
from datetime import datetime
import json
from pathlib import Path

In [3]:
# ========= CONFIG =========
source_system_tmdb = 'tmdb'
source_system_omdb = 'omdb'

# =========  DATA EXTRACTION =========
# Path to the folder containing the daily collected api outputs saved as json files
api_output_collect_dir_tmdb = os.path.join('..','..', 'data','raw','tmdb','2025','05','25','tmdb')
raw_json_file_tmdb = f'__all_{source_system_tmdb}_api_output_collected.json'
raw_json_file_path_tmdb = os.path.join(api_output_collect_dir_tmdb, raw_json_file_tmdb)

api_output_collect_dir_omdb = os.path.join('..','..', 'data','raw','omdb')
raw_json_file_omdb = f'__all_{source_system_omdb}_api_output_collected.json'
raw_json_file_path_omdb = os.path.join(api_output_collect_dir_omdb, raw_json_file_omdb)

# ========= JSON =========
json_data = []

# Iterate over each json file
for filename in os.listdir(api_output_collect_dir_tmdb):
    # CORRECTED LINE: Adjust the startswith condition
    if filename.startswith(f'{source_system_tmdb}_imdb_mapping_') and filename.endswith('.json'):
        file_path = os.path.join(api_output_collect_dir_tmdb, filename)
        try:
            with open(file_path, 'r') as file:
                print(file_path)
                content = json.load(file)
                # Process each api response in the list
                # Assuming 'content' is a dictionary for a single movie's data
                entry = {
                    "budget": content.get("budget"),
                    "tmdb_id": content.get("id"),
                    "imdb_id": content.get("imdb_id"),
                    "release_date": content.get("release_date"),
                    "revenue": content.get("revenue"),
                    "vote_average": content.get("vote_average"),
                    "vote_count": content.get("vote_count")
                }
                json_data.append(entry)
        except json.JSONDecodeError:
            print(f"Warning: Could not decode JSON from {filename}. It might be empty or corrupted.")
        except Exception as e:
            print(f"An unexpected error occurred while processing {filename}: {e}")

# Write the summarized relevant data to a single json file
overall_collect_file_tmdb = f'__all_{source_system_tmdb}_api_output_collected.json'
overall_collect_file_path_tmdb = os.path.join(api_output_collect_dir_tmdb, overall_collect_file_tmdb)
with open(overall_collect_file_path_tmdb, 'w') as output_file:
    json.dump(json_data, output_file, indent=4) # Added indent for readability

# Check json output contents
df_tmdb_transform = pd.read_json(overall_collect_file_path_tmdb)

# Only retain relevant columns. Column 'imdb_id' holds the unique imdb key for each movie title 
# that links to the imdb, omdb and tmdb datasets together
selected_column_names = ['budget', 'tmdb_id', 'imdb_id', 'release_date', 'revenue', 'vote_average', 'vote_count']
new_column_names = [f'{source_system_tmdb}_budget', f'{source_system_tmdb}_id', f'{source_system_tmdb}_imdb_id', f'{source_system_tmdb}_release_date', f'{source_system_tmdb}_revenue', 
                        f'{source_system_tmdb}_vote_average', f'{source_system_tmdb}_vote_count']
columns_dict = dict(zip(selected_column_names, new_column_names))
df_movie_box_office_tmdb = df_tmdb_transform[selected_column_names].rename(columns=columns_dict).reset_index(drop=True)

# Write dataframe to ETL transformation layer
transformed_dir = os.path.join('..','..','data', 'raw','box_office_mapping')

# Create the destination directory if it doesn't exist
os.makedirs(transformed_dir, exist_ok=True)

transformed_file_tmdb = f'{source_system_tmdb}_movie_box_office.csv'
transformed_file_path_tmdb = os.path.join(transformed_dir, transformed_file_tmdb)
df_movie_box_office_tmdb.to_csv(transformed_file_path_tmdb, index=False)

df_omdb_transform = pd.read_json(raw_json_file_path_omdb)

# Only retain rows that have actual box office info
df_movie_box_office = df_omdb_transform[df_omdb_transform['BoxOffice'].notna() & df_omdb_transform['BoxOffice'].str.startswith('$')].reset_index(drop=True)

# Convert remaining 'BoxOffice' values to integers by removing '$' and commas
df_movie_box_office['BoxOffice'] = df_movie_box_office['BoxOffice'].str.replace('$', '').str.replace(',', '').astype('Int64')

# Only retain relevant columns. Column 'imdbID' holds the unique IMDB key for each movie title 
# that links to the imdb dataset files 
selected_column_names = ['imdbID', 'BoxOffice']
new_column_names = [f'{source_system_omdb}_imdb_id', f'{source_system_omdb}_box_office']
columns_dict = dict(zip(selected_column_names, new_column_names))
df_movie_box_office_omdb = df_movie_box_office[selected_column_names].rename(columns=columns_dict).reset_index(drop=True)

transformed_file_omdb = f'{source_system_omdb}_movie_box_office.csv'
transformed_file_path_omdb = os.path.join(transformed_dir, transformed_file_omdb)

df_movie_box_office_omdb.to_csv(transformed_file_path_omdb, index=False)

../../data/raw/tmdb/2025/05/25/tmdb/tmdb_imdb_mapping_tt0035423_20250525_121737.json
../../data/raw/tmdb/2025/05/25/tmdb/tmdb_imdb_mapping_tt0069049_20250525_121737.json
../../data/raw/tmdb/2025/05/25/tmdb/tmdb_imdb_mapping_tt0082328_20250525_121736.json
../../data/raw/tmdb/2025/05/25/tmdb/tmdb_imdb_mapping_tt0088751_20250525_121737.json
../../data/raw/tmdb/2025/05/25/tmdb/tmdb_imdb_mapping_tt0096056_20250525_121737.json
../../data/raw/tmdb/2025/05/25/tmdb/tmdb_imdb_mapping_tt0104988_20250525_121736.json
../../data/raw/tmdb/2025/05/25/tmdb/tmdb_imdb_mapping_tt0108116_20250525_121737.json
../../data/raw/tmdb/2025/05/25/tmdb/tmdb_imdb_mapping_tt0109173_20250525_121737.json
../../data/raw/tmdb/2025/05/25/tmdb/tmdb_imdb_mapping_tt0110476_20250525_121736.json
../../data/raw/tmdb/2025/05/25/tmdb/tmdb_imdb_mapping_tt0113092_20250525_121737.json
../../data/raw/tmdb/2025/05/25/tmdb/tmdb_imdb_mapping_tt0114447_20250525_121737.json
../../data/raw/tmdb/2025/05/25/tmdb/tmdb_imdb_mapping_tt0114722_2

In [34]:
# =========  CONFIG =========
LOAD_DIR = os.path.join('..', '..', 'data', 'cleaned')
os.makedirs(LOAD_DIR, exist_ok=True)

In [None]:
# =========  DATA EXTRACTION =========
df_movies = pd.read_csv('../../data/raw/imdb/imdb_movie_titles.csv')
df_movie_imdb_ratings = pd.read_csv('../../data/raw/imdb/imdb_movie_ratings.csv')
df_omdb_movie_box_office = pd.read_csv('../../data/raw/box_office_mapping/omdb_movie_box_office.csv')
df_tmdb_movie_box_office = pd.read_csv('../../data/raw/box_office_mapping/tmdb_movie_box_office.csv')
df_movie_principals = pd.read_csv('../../data/raw/imdb/imdb_movie_principals.csv')
df_movie_persons = pd.read_csv('../../data/raw/imdb/imdb_movie_persons.csv')

# Apply cut-off dates to tmdb_movie_box_office
cutoff_date = datetime(datetime.now().year, 1, 1)
df_tmdb_movie_box_office['tmdb_release_date'] = pd.to_datetime(df_tmdb_movie_box_office['tmdb_release_date'])
df_tmdb_movie_box_office = df_tmdb_movie_box_office[
    (df_tmdb_movie_box_office['tmdb_release_date'] >= '2000-01-01') &
    (df_tmdb_movie_box_office['tmdb_release_date'] < cutoff_date)
].copy()
df_fact_movies = df_movies[['movie_imdb_id', 'movie_duration_minutes', 'movie_release_year']].copy()

# Merge all relevant dataframes
df_fact_movies = (df_fact_movies.merge(df_movie_imdb_ratings, how='left', on='movie_imdb_id')
							    .merge(df_omdb_movie_box_office, how='left', left_on='movie_imdb_id', right_on='omdb_imdb_id')
    							.merge(df_tmdb_movie_box_office, how='left', left_on='movie_imdb_id', right_on='tmdb_imdb_id')
                  )

# Replace 0 with NaN for numerical columns where 0 represents missing data
df_fact_movies = df_fact_movies.replace(0, np.nan)

# Consolidate release dates
df_fact_movies['release_date_imdb_fallback'] = pd.to_datetime(df_fact_movies['movie_release_year'].astype(str) + '-01-01')
df_fact_movies['movie_release_date'] = df_fact_movies['tmdb_release_date'].combine_first(df_fact_movies['release_date_imdb_fallback'])
df_fact_movies['movie_release_date_id'] = df_fact_movies['movie_release_date'].dt.strftime("%Y%m%d")

# Find min and max year for calendar table
calendar_min_year = df_fact_movies['movie_release_date'].dt.year.min()
calendar_max_year = df_fact_movies['movie_release_date'].dt.year.max()

# Consolidate box office figures
df_fact_movies['movie_box_office_$'] = df_fact_movies['tmdb_revenue'].combine_first(df_fact_movies['omdb_box_office'])

# Consolidate ratings and apply cap
df_fact_movies['movie_rating'] = df_fact_movies['movie_imdb_rating'].combine_first(df_fact_movies['tmdb_vote_average'])
df_fact_movies['movie_rating'] = np.minimum(df_fact_movies['movie_rating'], 10.0)

# Consolidate vote counts
df_fact_movies['movie_nof_votes'] = df_fact_movies['movie_imdb_nof_votes'].combine_first(df_fact_movies['tmdb_vote_count'])

# Rename budget column
df_fact_movies = df_fact_movies.rename(columns={'tmdb_budget': 'movie_budget_$'})

# Drop intermediate and redundant columns
columns_to_drop_fact = ['omdb_imdb_id', 'tmdb_id', 'tmdb_imdb_id', 'tmdb_release_date', 'tmdb_revenue',
                        'omdb_box_office', 'movie_release_year', 'release_date_imdb_fallback',
                        'movie_imdb_rating', 'tmdb_vote_average', 'movie_imdb_nof_votes','tmdb_vote_count'
                        ]
df_fact_movies = df_fact_movies.drop(columns=columns_to_drop_fact)

# Add flags
df_fact_movies['has_budget'] = df_fact_movies['movie_budget_$'].notnull().astype(int)
df_fact_movies['has_box_office'] = df_fact_movies['movie_box_office_$'].notnull().astype(int)
df_fact_movies['has_budget_box_office'] = ((df_fact_movies['movie_budget_$'].notnull()) &
                                           (df_fact_movies['movie_box_office_$'].notnull())).astype(int)
df_fact_movies['has_rating'] = df_fact_movies['movie_rating'].notnull().astype(int)
df_fact_movies['has_votes'] = df_fact_movies['movie_nof_votes'].notnull().astype(int)
df_fact_movies['has_rating_votes'] = ((df_fact_movies['movie_rating'].notnull()) &
                                      (df_fact_movies['movie_nof_votes'].notnull())).astype(int)

# Save
df_fact_movies.to_csv(os.path.join(LOAD_DIR, 'fact_movies.csv'), index=False)

In [35]:
# =========  DIM_MOVIE_TITLES =========
df_dim_movie_titles = df_movies[['movie_imdb_id', 'movie_title', 'movie_release_year']].copy()

df_dim_movie_titles = df_dim_movie_titles.merge(df_tmdb_movie_box_office[['tmdb_imdb_id', 'tmdb_release_date']],
                                                how='left', left_on='movie_imdb_id', right_on='tmdb_imdb_id')

df_dim_movie_titles['release_date_imdb_fallback'] = pd.to_datetime(df_dim_movie_titles['movie_release_year'].astype(str) + '-01-01')
df_dim_movie_titles['movie_release_date'] = (df_dim_movie_titles['tmdb_release_date']
                                             .combine_first(df_dim_movie_titles['release_date_imdb_fallback']))
df_dim_movie_titles = df_dim_movie_titles[['movie_imdb_id', 'movie_title', 'movie_release_date']]

# Save df_dim_movie_titles
df_dim_movie_titles.to_csv(os.path.join(LOAD_DIR, 'dim_movie_titles.csv'), index=False)


In [None]:
# =========  DIM_MOVIE_CATEGORIES & BRIDGE_MOVIE_CATEGORIE =========
df_movies_exploded_categories = (df_movies.assign(movie_category=df_movies['movie_categories'].str.split(',')).explode('movie_category').copy())
df_dim_movie_categories = pd.DataFrame(df_movies_exploded_categories['movie_category'].unique(), columns=['movie_category'])
df_dim_movie_categories = df_dim_movie_categories.sort_values('movie_category').reset_index(drop=True)
df_dim_movie_categories['movie_category_id'] = ('cat' + (df_dim_movie_categories.index + 1).astype(str))
df_dim_movie_categories = df_dim_movie_categories[['movie_category_id', 'movie_category']]

# Save df_dim_movie_categories
df_dim_movie_categories.to_csv(os.path.join(LOAD_DIR, 'dim_movie_categories.csv'),index=False, quotechar='"')

# Create bridge table
df_bridge_fact_movie_categories = df_movies_exploded_categories.merge(df_dim_movie_categories, how='left', on='movie_category')
df_bridge_fact_movie_categories['movie_category_id'] = df_bridge_fact_movie_categories['movie_category_id'].astype(str)
df_bridge_fact_movie_categories = df_bridge_fact_movie_categories[['movie_imdb_id', 'movie_category_id']]

# Save
df_bridge_fact_movie_categories.to_csv(os.path.join(LOAD_DIR, 'bridge_fact_movies_dim_movie_categories.csv'),index=False, quotechar='"')

# --- Dimension and Bridge Tables for Movie Persons (Roles) ---
movie_person_roles = ['director', 'actor', 'writer', 'producer']

for role in movie_person_roles:
    df_role_principals = df_movie_principals[df_movie_principals['movie_person_role'] == role].copy()

    # Bridge Table
    df_bridge_fact_movie_roles = df_role_principals[['movie_imdb_id', 'movie_person_name_id']]
    df_bridge_fact_movie_roles.to_csv(os.path.join(LOAD_DIR, f'bridge_fact_movies_dim_movie_{role}s.csv'), index=False)

    # Dimension Table
    df_dim_movie_role = pd.DataFrame(df_role_principals['movie_person_name_id'].unique(), columns=['movie_person_name_id'])
    df_dim_movie_role = df_dim_movie_role.sort_values('movie_person_name_id').reset_index(drop=True)
    df_dim_movie_role = df_dim_movie_role.merge(df_movie_persons, how='left', on='movie_person_name_id')

    # Rename columns to reflect the specific role
    df_dim_movie_role.columns = [col.replace('movie_person', role) for col in df_dim_movie_role.columns]
    df_dim_movie_role.to_csv(os.path.join(LOAD_DIR, f'dim_movie_{role}s.csv'), index=False)

In [None]:
# =========  DIM_MOVIE_CALENDAR =========
if not pd.isna(calendar_min_year) and not pd.isna(calendar_max_year):
    date_range = pd.date_range(start=f'{int(calendar_min_year)}-01-01', end=f'{int(calendar_max_year)}-12-31')
    df_calendar = pd.DataFrame(date_range, columns=['date'])

    df_calendar['date_id'] = df_calendar['date'].dt.strftime("%Y%m%d")
    df_calendar['year'] = df_calendar['date'].dt.year
    df_calendar['quarter_num'] = df_calendar['date'].dt.quarter
    df_calendar['quarter'] = 'Q' + df_calendar['quarter_num'].astype(str)
    df_calendar['month_num'] = df_calendar['date'].dt.month
    df_calendar['month'] = df_calendar['date'].dt.month_name()
    df_calendar['month_short'] = df_calendar['date'].dt.strftime('%b')
    df_calendar['week_number'] = df_calendar['date'].dt.isocalendar().week.astype(int) # Ensure int for week number
    df_calendar['week_day'] = df_calendar['date'].dt.day_name()
    df_calendar['week_day_short'] = df_calendar['date'].dt.strftime('%a')
    df_calendar['dow_num'] = df_calendar['date'].dt.dayofweek + 1
    df_calendar['day'] = df_calendar['date'].dt.day

    # Save df_calendar
    df_calendar.to_csv(os.path.join(LOAD_DIR, 'dim_movie_calendar.csv'), index=False)

In [None]:
# =========  DIM_MOVIE_USER_RATINGS (MOVIELENS) =========

# --- FUNCTION TO CONVERT ID FORMATS ---
def convert_movielens_imdb_to_full_imdb(movielens_imdb_id):
    """
    Converts a MovieLens IMDb ID (numerical) to the full IMDb ID format (ttXXXXXXX).
    Args: movielens_imdb_id (str or int): The IMDb ID from MovieLens, can be a string or an integer.
    Returns: str: The full IMDb ID in the format 'ttXXXXXXX' or None if the input is invalid.
    """
    if pd.isna(movielens_imdb_id): # Handle NaN values which might occur after merge
        return None

    try:
        numerical_id = int(movielens_imdb_id)
        full_imdb_id = f"tt{numerical_id:07d}"
        return full_imdb_id

    except ValueError:
        return None

# --- DATA EXTRACTION ---

ratings_movielens = pd.read_csv("../../data/raw/movielens/rating.csv")
link_movielens = pd.read_csv("../../data/raw/movielens/link.csv")

# --- MODIFICATIONS ---

user_counts = ratings_movielens["userId"].value_counts()
active_users = user_counts[user_counts >= 25].index
sampled_users = pd.Series(active_users).sample(n=1_000_000, random_state=42)

# - Modify ratings -
ratings_movielens = ratings_movielens.drop(columns=["timestamp"])
ratings_movielens = ratings_movielens[ratings_movielens["userId"].isin(sampled_users)]
ratings_movielens = (ratings_movielens.groupby("userId").apply(lambda x: x.sample(n=min(50, len(x)), random_state=42)).reset_index(drop=True))

# - Merge ratings to link -
df_ratings_full = ratings_movielens.merge(link_movielens, how='left', on='movieId')

# - Make nicer output file & add ratingId -
df_ratings_full= df_ratings_full [["userId","imdbId","rating"]].dropna(subset=["rating"]).reset_index(drop=True)
df_ratings_full.insert(0, "ratingId", df_ratings_full.index + 1)
df_ratings_full["ratingId"] = df_ratings_full.index.map(lambda x: f"R{x+1:06d}")

# - Apply the imdb to conversion -
df_ratings_full["imdbId"] = df_ratings_full["imdbId"].apply(convert_movielens_imdb_to_full_imdb)

# --- SAVE ---

df_ratings_full.to_csv("../../data/cleaned/dim_movie_user_ratings.csv", index=False)