In [1]:
# Import dependencies
import json
import pandas as pd
import numpy as np
import re
from sqlalchemy import create_engine
import psycopg2
from config import db_password
import time

In [2]:
# Add clean movie function that takes in argument "movie"
def clean_movie(movie):
    
    # Create a non-destructive copy
    movie = dict(movie)

    # Combine alternate titles into one list
    alt_titles = {}
    for key in ["Also known as","Arabic","Cantonese","Chinese","French",
                "Hangul","Hebrew","Hepburn","Japanese","Literally",
                "Mandarin","McCune–Reischauer","Original title","Polish",
                "Revised Romanization","Romanized","Russian",
                "Simplified","Traditional","Yiddish"]:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    if len(alt_titles) > 0:
        movie["alt_titles"] = alt_titles
    
    # Merge column names
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    change_column_name("Adaptation by", "Writer(s)")
    change_column_name("Country of origin", "Country")
    change_column_name("Directed by", "Director")
    change_column_name("Distributed by", "Distributor")
    change_column_name("Edited by", "Editor(s)")
    change_column_name("Length", "Running time")
    change_column_name("Original release", "Release date")
    change_column_name("Music by", "Composer(s)")
    change_column_name("Produced by", "Producer(s)")
    change_column_name("Producer", "Producer(s)")
    change_column_name("Productioncompanies ", "Production company(s)")
    change_column_name("Productioncompany ", "Production company(s)")
    change_column_name("Released", "Release Date")
    change_column_name("Release Date", "Release date")
    change_column_name("Screen story by", "Writer(s)")
    change_column_name("Screenplay by", "Writer(s)")
    change_column_name("Story by", "Writer(s)")
    change_column_name("Theme music composer", "Composer(s)")
    change_column_name("Written by", "Writer(s)")   

    return movie

In [3]:
# Create function that reads in, cleans, and loads data from three files containing Wikipedia data, Kaggle metadata, and MovieLens rating data
def extract_transform_load(wiki_data, kaggle_data, ratings_data):
    
    # Read in Kaggle metadata and MovieLens ratings CSV files as Pandas DFs
    kaggle_metadata = pd.read_csv(kaggle_data, low_memory = False)
    ratings = pd.read_csv(ratings_data)

    # Open and read Wikipedia data JSON file
    with open(wiki_data, mode = "r") as file:
        wiki_movies_raw = json.load(file)
    
    # Clean Wikipedia data
    # Use list comprehension to filter out TV shows and keep only movies with director and IMDb link
    wiki_movies = [movie for movie in wiki_movies_raw 
                   if "No. of episodes" not in movie
                   and ("Director" in movie or "Directed by" in movie) 
                   and "imdb_link" in movie]

    # Use list comprehension to iterate through cleaned wiki movies list and call clean_movie function on each movie
    clean_movies = [clean_movie(movie) for movie in wiki_movies]

    # Read in cleaned movies list as DF
    wiki_movies_df = pd.DataFrame(clean_movies)

    # Use try-except block to catch errors while extracting IMDb ID using regex string and dropping any imdb_id duplicates
    try:
        wiki_movies_df["imdb_id"] = wiki_movies_df["imdb_link"].str.extract(r"(tt\d{7})")
        wiki_movies_df.drop_duplicates(subset = "imdb_id", inplace = True)
        
    # If there is an error, capture and print exception
    except Exception as err:
        print(f"Error occurred, {err=}")

    # Use list comprehension to keep columns that have less than 90% null values from wiki_movies_df
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]

    # Clean box office column in wiki_movies_df
    # Create variable to hold non-null values from box office column
    box_office = wiki_movies_df["Box office"].dropna()
    
    # Use lambda and join functions to convert box office data to string values
    box_office = box_office.apply(lambda x: " ".join(x) if type(x) == list else x)

    # Remove values between dollar sign and hyphen for box office data given as ranges
    box_office = box_office.str.replace(r"\$.*[-—–](?![a-z])", "$", regex = True)
    
    # Use regex to match "form_one" of box office data
    form_one = r"\$\s*\d+\.?\d*\s*[mb]illi?on"
   
    # Use regex to match "form_two" of box office data
    form_two = r"\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)"

    # Use function to turn extracted values into numeric values
    def parse_dollars(s):
    
        # If s is not string, return NaN
        if type(s) != str:
            return np.nan
    
        # If input is of form $###.# million
        if re.match(r"\$\s*\d+\.?\d*\s*milli?on", s, flags = re.IGNORECASE):
        
            # Remove dollar sign and " million"
            s = re.sub("\$|\s|[a-zA-Z]","", s)
        
            # Convert to float and multiply by a million
            value = float(s) * 10**6

            # Return value
            return value

        # If input is of form $###.# billion
        elif re.match(r"\$\s*\d+\.?\d*\s*billi?on", s, flags = re.IGNORECASE):

            # Remove dollar sign and " billion"
            s = re.sub("\$|\s|[a-zA-Z]","", s)

            # Convert to float and multiply by a billion
            value = float(s) * 10**9

            # Return value
            return value

        # If input is of the form $###,###,###
        elif re.match(r"\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)", s, flags = re.IGNORECASE):
    
            # Remove dollar sign and commas
            s = re.sub("\$|,","", s)

            # Convert to float
            value = float(s)

            # Return value
            return value

        # Otherwise, return NaN
        else:
            return np.nan
    
    # Create new clean box office column
    wiki_movies_df["box_office"] = box_office.str.extract(f"({form_one}|{form_two})", flags = re.IGNORECASE)[0].apply(parse_dollars)
    
    # Drop original box office column
    wiki_movies_df.drop("Box office", axis = 1, inplace = True)
    
    # Clean budget column in wiki_movies_df
    # Create variable to hold non-null values from budget column
    budget = wiki_movies_df["Budget"].dropna()
    
    # Use lambda and join functions to convert budget data to string values
    budget = budget.apply(lambda x: " ".join(x) if type(x) == list else x)

    # Remove values between dollar sign and hyphen for budget data given as ranges
    budget = budget.str.replace(r"\$.*[-—–](?![a-z])", "$", regex = True)    

    # Remove citation references from budget data
    budget = budget.str.replace(r"\[\d+\]\s*", "", regex = True)
    
    # Create new clean budget column
    wiki_movies_df["budget"] = budget.str.extract(f"({form_one}|{form_two})", flags = re.IGNORECASE)[0].apply(parse_dollars)
    
    # Drop original budget column
    wiki_movies_df.drop("Budget", axis = 1, inplace = True)
    
    # Clean release date column in wiki_movies_df
    # Make variable to hold non-null release date values and convert lists to strings
    release_date = wiki_movies_df["Release date"].dropna().apply(lambda x: " ".join(x) if type(x) == list else x)

    # Create regex for parsing release date
    date_form_one = r"(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}"
    date_form_two = r"\d{4}.[01]\d.[0123]\d"
    date_form_three = r"(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}"
    date_form_four = r"\d{4}"
    
    # Extract release dates
    release_date.str.extract(f"({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})", flags = re.IGNORECASE)
    
    # Use datetime function to parse dates
    wiki_movies_df["release_date"] = pd.to_datetime(release_date.str.extract(f"({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})")[0], infer_datetime_format = True)
    
    # Drop original release date column
    wiki_movies_df.drop("Release date", axis = 1, inplace = True)
    
    # Clean running time column in wiki_movies_df
    # Make variable to hold non-null running time values and convert lists to strings
    running_time = wiki_movies_df["Running time"].dropna().apply(lambda x: " ".join(x) if type(x) == list else x)
    
    # Use regex to extract running time values
    running_time_extract = running_time.str.extract(r"(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m")
    
    # Convert strings to numeric values and make empty strings 0
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors = "coerce")).fillna(0)
    
    # Convert hour capture groups and minute capture groups to minutes where pure minutes capture group is 0 and save output to wiki_movies_df
    wiki_movies_df["running_time"] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis = 1)
    
    # Drop original running time column
    wiki_movies_df.drop("Running time", axis = 1, inplace = True)
    
    # Clean Kaggle metadata
    # Keep only rows where adult column is False and drop adult column
    kaggle_metadata = kaggle_metadata[kaggle_metadata["adult"] == "False"].drop("adult", axis = "columns")

    # Convert video column to Boolean data type
    kaggle_metadata["video"] = kaggle_metadata["video"] == "True"
    
    # Convert budget, id, and popularity columns to numeric data type
    kaggle_metadata["budget"] = kaggle_metadata["budget"].astype(int)
    kaggle_metadata["id"] = pd.to_numeric(kaggle_metadata["id"], errors = "raise")
    kaggle_metadata["popularity"] = pd.to_numeric(kaggle_metadata["popularity"], errors = "raise")
    
    # Convert release date to datetime data type
    kaggle_metadata["release_date"] = pd.to_datetime(kaggle_metadata["release_date"])
    
    # Merge wiki_movies_df and kaggle_metadata
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on = "imdb_id", suffixes = ["_wiki", "_kaggle"])

    # Drop redundant Wikipedia columns from merged DF
    movies_df.drop(columns = ["title_wiki", "release_date_wiki", "Language", "Production company(s)"], inplace = True)

    # Add function to fill missing Kaggle data with Wikipedia data and drop Wikipedia columns
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column], axis = 1)
        df.drop(columns = wiki_column, inplace = True)

    # Call function for 3 columns that need missing data filled
    fill_missing_kaggle_data(movies_df, "runtime", "running_time")
    fill_missing_kaggle_data(movies_df, "budget_kaggle", "budget_wiki")
    fill_missing_kaggle_data(movies_df, "revenue", "box_office")

    # Keep and reorder only necessary columns in movies_df
    movies_df = movies_df.loc[:, ["imdb_id","id","title_kaggle","original_title","tagline","belongs_to_collection","url","imdb_link",
                            "runtime","budget_kaggle","revenue","release_date_kaggle","popularity","vote_average","vote_count",
                            "genres","original_language","overview","spoken_languages","Country",
                            "production_companies","production_countries","Distributor",
                            "Producer(s)","Director","Starring","Cinematography","Editor(s)","Writer(s)","Composer(s)","Based on"]]

    # Rename columns in movies_df for consistency
    movies_df.rename({"id":"kaggle_id",
                      "title_kaggle":"title",
                      "url":"wikipedia_url",
                      "budget_kaggle":"budget",
                      "release_date_kaggle":"release_date",
                      "Country":"country",
                      "Distributor":"distributor",
                      "Producer(s)":"producers",
                      "Director":"director",
                      "Starring":"starring",
                      "Cinematography":"cinematography",
                      "Editor(s)":"editors",
                      "Writer(s)":"writers",
                      "Composer(s)":"composers",
                      "Based on":"based_on"}, 
                      axis = "columns", inplace = True)

    # Clean ratings data
    # Get count of each rating for each movie
    rating_counts = ratings.groupby(["movieId", "rating"], as_index = False).count() \
                    .rename({"userId":"count"}, axis = 1) \
                    .pivot(index = "movieId", columns = "rating", values = "count")
    
    # Add rating_ to beginning of each column name
    rating_counts.columns = ["rating_" + str(col) for col in rating_counts.columns]
    
    # Merge ratings data into movies_df
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on = "kaggle_id", right_index = True, how = "left")
    
    # Fill in missing rating values
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)
    
    # Add movies_df and MovieLens rating CSV data to SQL database
    # Create database engine
    engine = create_engine(f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data")
    
    # Import movies_df to SQL table
    movies_df.to_sql(name = "movies", con = engine, if_exists = "replace")
    
    # Import ratings to SQL table
    # Create variable for number of rows imported
    rows_imported = 0
    # Get start_time from time.time()
    start_time = time.time()
    for data in pd.read_csv(ratings_data, chunksize = 1000000):

        # Print range of row being imported
        print(f"importing rows {rows_imported} to {rows_imported + len(data)}...", end = "")

        data.to_sql(name = "ratings", con = engine, if_exists = "append")

        # Increment number of rows imported by chunksize
        rows_imported += len(data)

        # Print that rows have finished importing
        # Add elapsed time to final print out
        print(f"done: {time.time() - start_time} total seconds elapsed")

In [4]:
# Create path to file directory and variables for three files 
file_dir = "/Users/jenamis/Desktop/BootCamp/Module8/Challenge/movies-etl/Resources"
# Wikipedia data
wiki_file = f"{file_dir}/wikipedia-movies.json"
# Kaggle metadata
kaggle_file = f"{file_dir}/movies_metadata.csv"
# MovieLens rating data
ratings_file = f"{file_dir}/ratings.csv"

In [5]:
# Pass three file variables in extract_transform_load function
extract_transform_load(wiki_file, kaggle_file, ratings_file)

importing rows 0 to 1000000...done: 18.827893018722534 total seconds elapsed
importing rows 1000000 to 2000000...done: 37.71025109291077 total seconds elapsed
importing rows 2000000 to 3000000...done: 60.165858030319214 total seconds elapsed
importing rows 3000000 to 4000000...done: 78.83414030075073 total seconds elapsed
importing rows 4000000 to 5000000...done: 97.58187913894653 total seconds elapsed
importing rows 5000000 to 6000000...done: 116.21573138237 total seconds elapsed
importing rows 6000000 to 7000000...done: 135.40030121803284 total seconds elapsed
importing rows 7000000 to 8000000...done: 154.1742660999298 total seconds elapsed
importing rows 8000000 to 9000000...done: 172.79736614227295 total seconds elapsed
importing rows 9000000 to 10000000...done: 191.58841705322266 total seconds elapsed
importing rows 10000000 to 11000000...done: 209.71172308921814 total seconds elapsed
importing rows 11000000 to 12000000...done: 228.1201090812683 total seconds elapsed
importing row