In [19]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [20]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
# make copy of the movie within the function to preserve the original
    movie = dict(movie)
    
    # create empty dictionary to hold type of alt title as key and title as value
    alt_titles = {}
    
    # loop over the list of alternate title types to find if each type is in movie
    alternate_titles_columns =['Arabic','Also known as','Cantonese','Chinese','French','Hangul','Hebrew','Hepburn','Japanese',
                          'Literally','Mandarin','McCune–Reischauer','Original title','Polish','Revised Romanization','Romanized',
                           'Russian','Simplified','Traditional','Yiddish']
    for key in alternate_titles_columns:
        
        if key in movie:
            # add the type and title to alt_titles as a key value pair
            alt_titles[key] = movie[key]
           
            # remove from movie
            movie.pop(key)
    
    # if there are alt_titles, add alt_titles to movie
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles
    
    # Merge equivalent column names
    def change_column_name(old_name, new_name):
        if old_name in movie:
            
            movie[new_name] = movie.pop(old_name)
    # call change_column_name function on the desired columns
    change_column_name('Directed by', 'Director')
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin','Country')
    change_column_name('Distributed by','Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Original release', 'Release date')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release date')
    change_column_name('Running time', 'Length')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Voices of', 'Starring')
    change_column_name('Written by', 'Writer(s)')
    
    
    return movie

In [21]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load():
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_meta_data = pd.read_csv(kaggle_file, low_memory=False)
    ratings = pd.read_csv(ratings_file, low_memory=False)

    # Open and read the Wikipedia data JSON file.
    with open(wiki_file, mode='r') as file:
        wiki_movies_raw = json.load(file)
    
    # Write a list comprehension to filter out TV shows.
    wiki_movies = [movie for movie in wiki_movies_raw
                   if ('Director' or 'Directed by' in movie)
                   and'imdb_link' in movie
                   and'No. of episodes' not in movie]

    # Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_wiki_movies = [clean_movie(movie) for movie in wiki_movies]

    # Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(clean_wiki_movies)

    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
    except:
        print(f'Oops, something went wrong!')


    #  Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[columns_to_keep]

    # Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    
    # Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
    
    # Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?![mb]illion)'

    # Add the parse_dollars function.
    def parse_dollars(s):
        # if is is not a string, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

            # remove dollar sign and "million"
            s = re.sub('\$|\s|[a-zA-Z]', '', s)

            # convert to float and mulitply by a million
            value = float(s) * 10**6

            # return value
            return value

        # if imput is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

            # remove dollar sign and "billion"
            s = re.sub('\$|\s|[a-zA-Z]', '', s)

            # convert to float and mulitply by a billion
            value = float(s) * 10**9

            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?![mb]illion)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            # return value
            return value
        # otherwise, return NaN
        else:
            return np.nan
        
    # Clean the box office column in the wiki_movies_df DataFrame.
    box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True) # clean amounts written as range
    
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    # Clean the budget column in the wiki_movies_df DataFrame.
    budget = wiki_movies_df['Budget'].dropna()
    
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    
    budget = budget.str.replace(r'\[\d+\]\s*', '', regex=True)
    
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    wiki_movies_df.drop('Budget', axis=1, inplace=True)

    # Clean the release date column in the wiki_movies_df DataFrame.
    release_date = wiki_movies_df["Release date"].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d+,\s\d{4}'
    date_form_two = r'\d{4}.\d{2}.\d{2}'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'

    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)
    
    wiki_movies_df.drop('Release date', axis=1, inplace=True)
    
    # Clean the running time column in the wiki_movies_df DataFrame.
   
    length = wiki_movies_df['Length'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    length_extract = length.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d*)\s*m')
    
    length_extract = length_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    
    wiki_movies_df['running_time'] = length_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis = 1)
    
    wiki_movies_df.drop('Length', axis=1, inplace=True)
    
    # 2. Clean the Kaggle metadata.
    # keep only movies where adult is False
    kaggle_meta_data = kaggle_meta_data[kaggle_meta_data['adult']== 'False'].drop('adult', axis='columns')
    
    # convert object to boolean datatype
    kaggle_meta_data['video'] = kaggle_meta_data['video'] == 'True'
    
    # convert budget to int
    kaggle_meta_data['budget'] = kaggle_meta_data['budget'].astype(int)
    
    # convert id to numeric
    kaggle_meta_data['id'] = pd.to_numeric(kaggle_meta_data['id'], errors='raise')
    
    # convert popularity to numeric
    kaggle_meta_data['popularity'] = pd.to_numeric(kaggle_meta_data['popularity'], errors='raise')
    
    # conver release date to datetime
    kaggle_meta_data['release_date'] = pd.to_datetime(kaggle_meta_data['release_date'])
    
    
    # 3. Merged the two DataFrames into the movies DataFrame.
    movies_df = pd.merge(wiki_movies_df, kaggle_meta_data, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # 4. Drop unnecessary columns from the merged DataFrame.
    # redundant columns from merging:
    # Wiki                   Movielens                    Resolution
    # ------------------------------------------------------------------------------
    # title_wiki             title_kaggle                 Drop title_wiki
    # running_time           runtime                      Keep runtime, fill missing values with running_time
    # release_date_wiki      release_date_kaggle          Drop release_date_wiki
    # Production company(s)  production_companies         Drop Production company(s)
    # box_office             revenue                      Keep revenue, fill missing values with box_office
    # Language               original_language            Drop Language
    # budget_wiki            budget_kaggle                Keep budget_kaggle, fill missing values with budget_wiki    
    
    # drop row with corrupt date data
    movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki']>'1996-01-01') & (movies_df['release_date_kaggle']<'1965-01-01')].index)
    movies_df.drop(columns=['title_wiki', 'release_date_wiki', 'Production company(s)', 'Language'], inplace=True)
    
    # 5. Add in the function to fill in the missing Kaggle data.
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column]==0 else row[kaggle_column], axis=1)
        df.drop(columns=wiki_column, inplace=True)

    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    fill_missing_kaggle_data(movies_df, 'runtime','running_time')
    fill_missing_kaggle_data(movies_df, 'revenue','box_office')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle','budget_wiki')

    # 7. Filter the movies DataFrame for specific columns.
    movies_df.drop(columns='video', inplace=True)

    # 8. Rename the columns in the movies DataFrame.
    # reorder columns
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on']]
    
    movies_df.rename({'id':'kaggle_id',
                 'title_kaggle':'title',
                 'url':'wikipedia_url',
                 'budget_kaggle':'budget',
                 'release_date_kaggle': 'release_date',
                 'Country':'country',
                 'Distributor':'distributor',
                 'Producer(s)':'producers',
                 'Director':'director',
                 'Starring':'starring',
                 'Cinematography':'cinematography',
                 'Editor(s)':'editor',
                 'Writer(s)':'writers',
                 'Composer(s)':'composer',
                 'Based on':'based_on'},axis=1, inplace=True)
    # 9. Transform and merge the ratings DataFrame.
    # groupby movieID then rating
    ratings.groupby(['movieId','rating'], as_index=False).count()
    ratings_counts = ratings.groupby(['movieId','rating'], as_index=False).count().rename({'userId':'count'}, axis=1).pivot(index='movieId', columns='rating', values='count')
    ratings_counts.columns = [f'rating_{col}' for col in ratings_counts.columns]
    movies_with_ratings_df = movies_df.merge(ratings_counts, how='left', left_on='kaggle_id', right_index=True)
    movies_with_ratings_df[ratings_counts.columns] = movies_with_ratings_df[ratings_counts.columns].fillna(0)
    
    # create connection to postgresql database
    "postgres://[user]:[password]@[location]:[port]/[database]"

    # create local server
    db_string = f"postgres://postgres:{db_password}@127.0.0.1:5432/movie_data"
    
    # create database engine
    engine = create_engine(db_string)
    
    # import movies_df to SQL table
    movies_df.to_sql(name='movies', con=engine, if_exists='replace')
    
    # re-import ratings.csv in chunks
    rows_printed = 0

    # get start_time from time.time()
    start_time = time.time()

    for data in pd.read_csv(f'{file_dir}/ratings.csv', chunksize=1000000):

        # print out the range of rows that are being imported
        print(f'importing rows {rows_printed} to {rows_printed + len(data)}...', end='')

        data.to_sql(name='ratings', con=engine, if_exists='append')

        # increment the number of rows imported by the chunksize
        rows_printed += len(data)

    # print that the rows have finished importing
    print(f'Done. {time.time() - start_time} total seconds elapsed')


In [22]:
# 10. Create the path to your file directory and variables for the three files.
file_dir = "C://Users/user/Desktop/Data Bootcamp Analysis Projects/8th Module/Movies_ETL/Resources"
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia.movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [23]:
# 11. Set the three variables equal to the function created in D1.
extract_transform_load()

importing rows 0 to 1000000...importing rows 1000000 to 2000000...importing rows 2000000 to 3000000...importing rows 3000000 to 4000000...importing rows 4000000 to 5000000...importing rows 5000000 to 6000000...importing rows 6000000 to 7000000...importing rows 7000000 to 8000000...importing rows 8000000 to 9000000...importing rows 9000000 to 10000000...importing rows 10000000 to 11000000...importing rows 11000000 to 12000000...importing rows 12000000 to 13000000...importing rows 13000000 to 14000000...importing rows 14000000 to 15000000...importing rows 15000000 to 16000000...importing rows 16000000 to 17000000...importing rows 17000000 to 18000000...importing rows 18000000 to 19000000...importing rows 19000000 to 20000000...importing rows 20000000 to 21000000...importing rows 21000000 to 22000000...importing rows 22000000 to 23000000...importing rows 23000000 to 24000000...importing rows 24000000 to 25000000...importing rows 25000000 to 26000000...importing rows 26000000 to 26024289..

TypeError: cannot unpack non-iterable NoneType object