In [51]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [52]:
# 1. Add the clean movie function that takes in the argument, "movie".
def clean_movie_func(movie):
    movie = dict(movie) #create a non-destructive copy
    alt_titles = {}
    # combine alternate titles into one list
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune-Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles

    # merge column names
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')

    return movie

In [85]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

# def extract_transform_load(wiki_file, kaggle_file, ratings_file):
def extract_transform_load():
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata = pd.read_csv(kaggle_file, low_memory=False)
    ratings = pd.read_csv(ratings_file)

    # Open and read the Wikipedia data JSON file.
    with open(wiki_file, mode='r') as file:
        wiki_movies_raw = json.load(file)
    
    # Write a list comprehension to filter out TV shows.
    wiki_movies = [movie for movie in wiki_movies_raw
               if ('Director' in movie or 'Directed by' in movie)
                   and 'imdb_link' in movie
                   and 'No. of episodes' not in movie]

    # Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_movies = [clean_movie_func(movie) for movie in wiki_movies]

    # Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(clean_movies)

    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
 
    except Exception as e:
        print(e)

    #  Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() 
                            < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]

    # Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    test = wiki_movies_df.columns.to_list()

    # Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'

    # Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'
    
    # This wasn't asked for, but it's in the regular module and seems harmless:
    box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    
    # Add the parse_dollars function.
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a million
            value = float(s) * 10**6

            # return value
            return value

        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            # return value
            return value

        # otherwise, return NaN
        else:
            return np.nan
        
    # Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    test = wiki_movies_df.columns.to_list()
    
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    # Clean the budget column in the wiki_movies_df DataFrame.
    # Create a budget variable
    budget = wiki_movies_df['Budget'].dropna()
    # Convert any lists to strings
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    # Then remove any values between a dollar sign and a hyphen (for budgets given in ranges)
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    # The above line was not asked for, so it may be extraneous. 
    # Use the same pattern matches that were created to parse the box office data
    matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE)
    matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE)
    # Remove the citation references (Also not asked for)
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    # Discard non-standard entries.
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    # Clean the release date column in the wiki_movies_df DataFrame.
    # Parse Release Date
    # Make a variable that holds the non-null values of Release date in the DataFrame, converting lists to strings
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    # Forms to parse:
    #    Full month name, one- to two-digit day, four-digit year (i.e., January 1, 2000)
    #    Four-digit year, two-digit month, two-digit day, with any separator (i.e., 2000-01-01)
    #    Full month name, four-digit year (i.e., January 2000)
    #    Four-digit year
    # 
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    # Parse dates with built-in to_datetime() method
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)
    
    # Clean the running time column in the wiki_movies_df DataFrame.
    # Parse Running Time
    # Make a variable that holds the non-null values of Release date in the DataFrame, converting lists to strings
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    # Extract trying to capture a few more rows by relaxing some requirements and allowing for hours.
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    # Need to convert the strings above to numeric values and set the errors argument to 'coerce' to force empty strings to NaN's
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    # Now we can apply a function that will convert the hour capture groups and minute capture groups to minutes 
    # if the pure minutes capture group is zero, and save the output to wiki_movies_df
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    # Drop Running time from the dataset
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
     
    # 2. Clean the Kaggle metadata.
    # Keep the rows where the adult column is False, and then drop the adult column.
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
    # Convert the video column.
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    # Convert columns to numeric.
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    # Convert release_date to datetime.
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])
    
    # 3. Merged the two DataFrames into the movies DataFrame.
    # Merge tables by IMDb ID. Use the suffixes parameter to make it easier to identify which table each column came from.
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])
    
    # Somehow The Holiday in the Wikipedia data got merged with From Here to Eternity. 
    # We had to drop that row from our DataFrame in the regular module, so doing it here. 
    # Actually, this causes the row count to be one less than expected, so commenting this out.
    # movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index)
      
    # 4. Drop unnecessary columns from the merged DataFrame.
    # Drop the title_wiki, release_date_wiki, Language, and Production company(s) columns.
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)
    

    # 5. Add in the function to fill in the missing Kaggle data.
    # Create a function that fills in missing data for a column pair and then drops the redundant column.
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
            , axis=1)
        df.drop(columns=wiki_column, inplace=True)
        
    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    # Call the function for the three column pairs where we decided it's needed.
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')


    # 7. Filter the movies DataFrame for specific columns.
    # Not sure what's being asked here. 
    # Guess I'll drop the 'video' column since in the regular module it was noted 
    # that it only had one value, even though it didn't actually drop it.
    movies_df.drop('video', axis=1, inplace=True)

    # 8. Rename the columns in the movies DataFrame.
    # Not sure if the columns need to be reordered first, doing it anyway. 
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]
    # Rename the columns to be consistent.
    movies_df.rename({'id':'kaggle_id',
                      'title_kaggle':'title',
                      'url':'wikipedia_url',
                      'budget_kaggle':'budget',
                      'release_date_kaggle':'release_date',
                      'Country':'country',
                      'Distributor':'distributor',
                      'Producer(s)':'producers',
                      'Director':'director',
                      'Starring':'starring',
                      'Cinematography':'cinematography',
                      'Editor(s)':'editors',
                      'Writer(s)':'writers',
                      'Composer(s)':'composers',
                      'Based on':'based_on'
                     }, axis='columns', inplace=True)

    # 9. Transform and merge the ratings DataFrame.
    # Used a groupby on the "movieId" and "rating" columns and took the count for each group.
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count().rename({'userId':'count'}, axis=1).pivot(index='movieId',columns='rating', values='count')
    # Rename
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    # Merge
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    # fill nan's with zeroes.
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

    # Putting in a sleep in case the error I'm getting on the movies_df load is 
    # a race condition. It seems to load anyway, and the count is right. 
    time.sleep(60)
    # Setup the database engine connection
    db_string = f"postgres://postgres:{db_password}@127.0.0.1:5432/movie_data"
    # Create the engine
    engine = create_engine(db_string)
    # Save movies_df to movies table
    movies_df.to_sql(name='movies', con=engine, if_exists='replace')
    
    # Before reading in the MovieLens rating CSV data, drop the ratings table in pgAdmin.
    # I think I can do it here with a little help from stackoverflow.com
    # ...or maybe not. Let's see. 
    connection = engine.raw_connection()
    cursor = connection.cursor()
    # Not sure about this line:
    cursor.execute("DROP TABLE IF EXISTS ratings")
    connection.commit()
    connection.close()
    
    # Import the Ratings Data
    rows_imported = 0
    # pd.read_csv doesn't seem to like the original file path specification, but this works: 
    file_dir = '/Users/RobertEnno/Desktop/boot_camp/repo/modules/Movies-ETL/Resources'
    # get the start_time from time.time()
    start_time = time.time()
    for data in pd.read_csv(f'{file_dir}/ratings.csv', chunksize=1000000):
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        data.to_sql(name='ratings', con=engine, if_exists='append')
        rows_imported += len(data)

        # add elapsed time to final print out
        print(f'Done. {time.time() - start_time} total seconds elapsed')


    # Return three variables. The first is the wiki_movies_df DataFrame 
    return wiki_movies_df, kaggle_metadata, ratings, test

In [86]:
# 10. Create the path to your file directory and variables for the three files.
file_dir = 'Resources'
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia-movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [87]:
# Refactor Step 11 of Deliverable 3 so that you pass in the variables for the files created in 
# Step 10 of Deliverable 3 in the function created in Deliverable 1.
# extract_transform_load('/Users/RobertEnno/Desktop/boot_camp/repo/modules/Movies-ETL/Resources/wikipedia-movies.json', 
#                        '/Users/RobertEnno/Desktop/boot_camp/repo/modules/Movies-ETL/Resources/movies_metadata.csv', 
#                        '/Users/RobertEnno/Desktop/boot_camp/repo/modules/Movies-ETL/Resources/ratings.csv')
wiki_file, kaggle_file, ratings_file, test = extract_transform_load()

importing rows 0 to 1000000...Done. 188.17499232292175 total seconds elapsed
importing rows 1000000 to 2000000...Done. 382.0345253944397 total seconds elapsed
importing rows 2000000 to 3000000...Done. 572.6242411136627 total seconds elapsed
importing rows 3000000 to 4000000...Done. 747.042192697525 total seconds elapsed
importing rows 4000000 to 5000000...Done. 907.3295516967773 total seconds elapsed
importing rows 5000000 to 6000000...Done. 1066.4675023555756 total seconds elapsed
importing rows 6000000 to 7000000...Done. 1226.7092416286469 total seconds elapsed
importing rows 7000000 to 8000000...Done. 1391.7640376091003 total seconds elapsed
importing rows 8000000 to 9000000...Done. 1554.3506982326508 total seconds elapsed
importing rows 9000000 to 10000000...Done. 1716.6498494148254 total seconds elapsed
importing rows 10000000 to 11000000...Done. 1881.1180837154388 total seconds elapsed
importing rows 11000000 to 12000000...Done. 2044.190948009491 total seconds elapsed
importing r