In [1]:
#dependencies
import json
import pandas as pd
import numpy as np
import re
from sqlalchemy import create_engine
import psycopg2
from config import db_password
import time

In [2]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    movie = dict(movie) #create a non-destructive copy
    
    #take all alternative titles, collapse to one column
    alt_titles = {}

    #loop through alt title names
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune–Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:

        #if any alt titles exist, add them to a dictionary, put that dictionary in movie list
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
        
        if len(alt_titles) > 0:
            movie['alt_titles'] = alt_titles
    
    #function to condense similar columns into one
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)

    #run above function on similar columns
    #see ColumnChanges.xlsx for work on deciding these columns
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Original language(s)', 'Language')
    change_column_name('Original release', 'Release date')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Voices of', 'Starring')
    change_column_name('Written by', 'Writer(s)')


    return movie

In [3]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load():
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata = pd.read_csv(kaggle_file, low_memory=False)
    ratings = pd.read_csv(ratings_file)

    # Open the read the Wikipedia data JSON file.
    with open(wiki_file, mode = 'r') as file:
        wiki_movies_raw = json.load(file)
    
    # Write a list comprehension to filter out TV shows.
    wiki_movies = [movie for movie in wiki_movies_raw
                if ('Director' in movie or 'Directed by' in movie)
                and 'imdb_link' in movie
                and 'No. of episodes' not in movie]
    

    # Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_movies = [clean_movie(movie) for movie in wiki_movies]
    

    # Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(clean_movies)


    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df["imdb_id"] = wiki_movies_df["imdb_link"].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
    except:
        print(f'Issue with {wiki_movies_df["title"]}')

    
    #  Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]
    

    # Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    
    # Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    #condense ranges
    box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    

    # Write a regular expression to match the six elements of "form_one" of the box office data.
    # $ 53 [mb]illion
    form_one = r"\$\s*\d+\.?\d*\s*[mb]ill?i?on"
       
    # Write a regular expression to match the three elements of "form_two" of the box office data.
    # $ 123,500,000
    form_two = r"\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illi?on)"
    
    # Add the parse_dollars function.
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*mill?i?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a million
            value = float(s) * 10**6

            # return value
            return value

        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*bill?i?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illi?on)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            # return value
            return value

        # otherwise, return NaN
        else:
            return np.nan
    
        
    # Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Box office', axis = 1, inplace=True)

    
    # Clean the budget column in the wiki_movies_df DataFrame.
    
    #create budget series to work with
    budget = wiki_movies_df['Budget'].dropna()
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    
    #condense ranges
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)

    #get rid of citations
    budget = budget.str.replace(r'\[\d+\]\s*', '')

    #apply the parse dollars function and drop old column
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    # wiki_movies_df.drop('Budget', axis=1, inplace=True)
    

    # Clean the release date column in the wiki_movies_df DataFrame.
    #get release date series to work with
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    # define date forms
    # ( 1994-01-02 )
    date_form_one = r'\(\s\d{4}.+\d{1,2}.+\d{1,2}\s\)'
    # January 2, 1994 
    date_form_two = r'[a-zA-Z]{3,9}\s*\d{1,2},?\s*\d{4}'
    # January 1994
    date_form_three = r'[a-zA-Z]{3,9}\s*\d{4}'
    # 1994
    date_form_four = r'\d{4}'

    #put the parsed release dates in the dataframe, drop the old one
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', 
                                                    flags=re.IGNORECASE)[0], infer_datetime_format=True)

    # wiki_movies_df.drop('Release date', axis=1, inplace=True)

    # Clean the running time column in the wiki_movies_df DataFrame.
    # get running time series to work with
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

    # two main length formats: 124 m[inutes] or 2 h[ours] 35 [minutes]
    # extract the two main length formats, convert to number
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)

    #convert both capture groups to minutes total, drop the old column
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
         
    # Clean the Kaggle metadata.
    
    # keep only non-adult films and drop the column
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')

    # convert video col to bool
    kaggle_metadata['video'] = kaggle_metadata['video'] == True

    # work on numeric columns
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')

    # convert release date to datetime
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])

    # Merged the two DataFrames into the movies DataFrame.
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # Drop unnecessary columns from the merged DataFrame.
    # Wiki                     Movielens                Resolution
    #--------------------------------------------------------------------------
    # title_wiki               title_kaggle             drop wikipedia titles col
    # running_time             runtime                  keep kaggle, fill in 0s from wikipedia
    # budget_wiki              budget_kaggle            keep kaggle, fill in 0s from wikipedia
    # box_office               revenue                  keep kaggle, fill in 0s from wikipedia
    # release_date_wiki        release_date_kaggle      drop wikipedia release date col
    # Language                 original_language        drop wikipedia langauage col
    # Production company(s)    production_companies     drop wikipedia production company(s) col

    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)


    # Add in the function to fill in the missing Kaggle data.
    
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        
        #infill the 0s
        df[kaggle_column] = df.apply(lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column], axis = 1)

        #drop the wiki column
        df.drop(columns = wiki_column, inplace = True)


    # Call the function in Step 5 with the DataFrame and columns as the arguments.
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')


    # Filter the movies DataFrame for specific columns.
    movies_df.drop(columns=['video'], inplace=True)
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]


    # Rename the columns in the movies DataFrame.
    movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)


    # Transform and merge the ratings DataFrame.
    # convert timestamp to datetime
    # ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s', origin='unix')

    # create ratings pivot table, indexed by movie id
    ratings_grouped = ratings.groupby(['movieId', 'rating'], as_index=False).count() \
                         .rename({'userId' : 'Count'}, axis = 1) \
                         .pivot(index = 'movieId', columns = 'rating', values = 'Count')

    # make columns more descriptive
    ratings_grouped.columns = ['rating_' + str(col) for col in ratings_grouped.columns]

    # merge the ratings df into the movies df
    movies_with_ratings_df = pd.merge(movies_df, ratings_grouped, how = 'left', left_on='kaggle_id', right_index = True)
    movies_with_ratings_df[ratings_grouped.columns] = movies_with_ratings_df[ratings_grouped.columns].fillna(0)

    # create connection to postgres
    db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"

    #create engine
    engine = create_engine(db_string)

    #send movies to postgres
    movies_df.to_sql(name = 'movies', con = engine, if_exists='replace')

    # create a variable for the number of rows imported, and time started
    rows_imported = 0
    start_time = time.time()

    # loading raw ratings to send to postgres
    for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):

        
        print(f'importing rows {rows_imported: ,} to {rows_imported + len(data): ,}...', end='')
        data.to_sql(name='ratings', con=engine, if_exists='append')
        rows_imported += len(data)

        # print that the rows have finished importing
        print(f'Done. {round(time.time() - start_time, 2)} seconds have elapsed')



In [4]:
# Create the path to your file directory and variables for the three files.
file_dir = 'Data/'
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia-movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [5]:
# run the program
extract_transform_load()



importing rows  0 to  1,000,000...Done. 16.03 seconds have elapsed
importing rows  1,000,000 to  2,000,000...Done. 31.44 seconds have elapsed
importing rows  2,000,000 to  3,000,000...Done. 47.2 seconds have elapsed
importing rows  3,000,000 to  4,000,000...Done. 62.6 seconds have elapsed
importing rows  4,000,000 to  5,000,000...Done. 77.92 seconds have elapsed
importing rows  5,000,000 to  6,000,000...Done. 93.6 seconds have elapsed
importing rows  6,000,000 to  7,000,000...Done. 109.25 seconds have elapsed
importing rows  7,000,000 to  8,000,000...Done. 124.7 seconds have elapsed
importing rows  8,000,000 to  9,000,000...Done. 140.1 seconds have elapsed
importing rows  9,000,000 to  10,000,000...Done. 155.65 seconds have elapsed
importing rows  10,000,000 to  11,000,000...Done. 171.53 seconds have elapsed
importing rows  11,000,000 to  12,000,000...Done. 186.85 seconds have elapsed
importing rows  12,000,000 to  13,000,000...Done. 202.41 seconds have elapsed
importing rows  13,000,0