In [1]:
#Import dependencies
import json
import pandas as pd
import numpy as np
import re
from sqlalchemy import create_engine
import psycopg2
from config import db_password
import time

In [2]:
def challengemovie(wiki_movies, kaggle_data, ratings): 
    # load file 
    file_to_load = 'wikipedia.movies.json'
    
    # read file
    with open('wikipedia.movies.json', mode='r') as file:
        wiki_movies_raw = json.load(file)
    
    # Extract Kaggle data
    kaggle_metadata = pd.read_csv('movies_metadata.csv', low_memory=False)
    ratings = pd.read_csv('ratings.csv')
    
    # Create list comprehension to filter wiki movies to include only movies with a director and IMDB link in movie and to filter out Tv shows 
    wiki_movies = [movie for movie in wiki_movies_raw
                   if ('Director' in movie or 'Directed by' in movie)
                       and 'imdb_link' in movie
                       and 'No. of episodes' not in movie]
    def clean_movie(movie):
        movie = dict(movie) #create a non-destructive copy
        alt_titles = {}
        # combine alternate titles into one list
        for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune-Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
            if key in movie:
                alt_titles[key] = movie[key]
                movie.pop(key)
        if len(alt_titles) > 0:
            movie['alt_titles'] = alt_titles

        # merge column names
        def change_column_name(old_name, new_name):
            if old_name in movie:
                movie[new_name] = movie.pop(old_name)
        change_column_name('Adaptation by', 'Writer(s)')
        change_column_name('Country of origin', 'Country')
        change_column_name('Directed by', 'Director')
        change_column_name('Distributed by', 'Distributor')
        change_column_name('Edited by', 'Editor(s)')
        change_column_name('Length', 'Running time')
        change_column_name('Original release', 'Release date')
        change_column_name('Music by', 'Composer(s)')
        change_column_name('Produced by', 'Producer(s)')
        change_column_name('Producer', 'Producer(s)')
        change_column_name('Productioncompanies ', 'Production company(s)')
        change_column_name('Productioncompany ', 'Production company(s)')
        change_column_name('Released', 'Release Date')
        change_column_name('Release Date', 'Release date')
        change_column_name('Screen story by', 'Writer(s)')
        change_column_name('Screenplay by', 'Writer(s)')
        change_column_name('Story by', 'Writer(s)')
        change_column_name('Theme music composer', 'Composer(s)')
        change_column_name('Written by', 'Writer(s)')

        return movie
    
    # Create a list for clean movies using list comprehension
    clean_movies = [clean_movie(movie) for movie in wiki_movies]
    #set the data frame from wiki_movies_df to include the data in clean_movies
    wiki_movies_df = pd.DataFrame(clean_movies)
    
    # extract IMDb ID
    wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
    
    # Remove IMDb_ID duplicates
    wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
    
    # List comprehensions clean up for columns to keep 
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    # Put wiki_columns_to_keep in the wiki_movies_df 
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]
    
    # create variable for droped box offcie data 
    box_office = wiki_movies_df['Box office'].dropna() 
    
    # Create a string seprator and then join it 
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    # create from_one and use a regular expression to make data consistent 
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
    
    # create form_two and use a regular expression to make data consistent 
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'
    
    # create variable that represent form 1 and 2 to find missing forms
    matches_form_one = box_office.str.contains(form_one, flags=re.IGNORECASE)
    matches_form_two = box_office.str.contains(form_two, flags=re.IGNORECASE)
    
    # find data that begins with $ and ends with hyphen and replace with $ only
    box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    
    # extract box_offcie using form_one and form_two 
    box_office.str.extract(f'({form_one}|{form_two})')
    
    def parse_dollars(s):
    # if s is not a string, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a million
            value = float(s) * 10**6

            # return value
            return value

        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            # return value
            return value

        # otherwise, return NaN
        else:
            return np.nan
    
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    # drop box offcie column 
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    # create variable for droped budget 
    budget = wiki_movies_df['Budget'].dropna()
    
    # Create a string seprator and then join it 
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    
    # create series to indetify which data does not match 
    matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE)
    matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE)
    budget[~matches_form_one & ~matches_form_two]
    
    # remove citation references
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    budget[~matches_form_one & ~matches_form_two]
    
    # Parse Budget data 
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    # drop budget column 
    wiki_movies_df.drop('Budget', axis=1, inplace=True)
    
    #create a varaible for release dates and convert to a string 
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    # Parse the forms
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    
    #Extract the release date 
    release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)

    # Use built in infer_datetime function in Pandas and set to true 
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)
    
    # create a vriable for running time and convert to string 
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
           
    # extract digits only and allow for patterns in the data to be recognized 
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    
    # convert string into numeric value 
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    
    # convert hours and minutes to minutes if minutes group is 0
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    
    # drop running time 
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    
    #keep row where adult data false and drop adult column 
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
    
    # create a  boolean column and assign it back  to video 
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    
    # convert kaggle data to numeric values 
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    
    #convert release date from standard to datetime format
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])
    
    # assign rating to timestamp column 
    ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s')
    
    # Merge wiki_movies, Kaggle_metadata on IMDb ID.
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])
    
    # Drop outlieing movie 
    movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index)
    
    # drop title_wiki, release_date_wiki, language and production companies 
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)
    
    # function to fill in missing data and drop columns 
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
            , axis=1)
        df.drop(columns=wiki_column, inplace=True)
        
    # defines what missing data to fill in with zeros 
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
    
    # reorder the columns 
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                           'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                           'genres','original_language','overview','spoken_languages','Country',
                           'production_companies','production_countries','Distributor',
                           'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                          ]]
    
    #rename the columns 
    movies_df.rename({'id':'kaggle_id',
                      'title_kaggle':'title',
                      'url':'wikipedia_url',
                      'budget_kaggle':'budget',
                      'release_date_kaggle':'release_date',
                      'Country':'country',
                      'Distributor':'distributor',
                      'Producer(s)':'producers',
                      'Director':'director',
                      'Starring':'starring',
                      'Cinematography':'cinematography',
                      'Editor(s)':'editors',
                      'Writer(s)':'writers',
                      'Composer(s)':'composers',
                      'Based on':'based_on'
                     }, axis='columns', inplace=True)
    
    # group by rating and movie ID , rename userID to count , make movie ID the index
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                    .rename({'userId':'count'}, axis=1) \
                    .pivot(index='movieId',columns='rating', values='count')
    
    # prepend rating to each column 
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    
    #left merge
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    
    #fill in blanks with zero values 
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)
    
    # connect sql 
    db_string = f"postgres://postgres:{db_password}@localhost/movie_data"
    
    # create database engine
    engine = create_engine(db_string)
    
    try:
        #delete rows from movies table 
        from sqlalchemy.orm import sessionmaker
        Session = sessionmaker(bind=engine)
        session = Session()
        session.execute('''TRUNCATE TABLE movies''')
        session.commit()
        session.close()
    except:
        print("deleting movie exception")
    try:
        # delete rows from ratings table
        from sqlalchemy.orm import sessionmaker
        Session = sessionmaker(bind=engine)
        session = Session()
        session.execute('''TRUNCATE TABLE ratings''')
        session.commit()
        session.close()
    except:
        print("deleting ratings exception")
    
    # save movie database to sql and append new data if table exists
    movies_df.to_sql(name='movies', con=engine, if_exists='append')
    
    #import rating to sql using chunk size parameter 
    rows_imported = 0
    # get the start_time from time.time()
    start_time = time.time()
    for data in pd.read_csv(f'ratings.csv', chunksize=1000000):
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        data.to_sql(name='ratings', con=engine, if_exists='append')
        rows_imported += len(data)

        # add elapsed time to final print out
        print(f'Done. {time.time() - start_time} total seconds elapsed')
        

In [3]:
challengemovie("wiki_movies.json", "movies_metadata.csv", "ratings.csv")

importing rows 0 to 1000000...Done. 139.66332578659058 total seconds elapsed
importing rows 1000000 to 2000000...Done. 278.0085554122925 total seconds elapsed
importing rows 2000000 to 3000000...Done. 417.8144636154175 total seconds elapsed
importing rows 3000000 to 4000000...Done. 562.6736030578613 total seconds elapsed
importing rows 4000000 to 5000000...Done. 711.1630573272705 total seconds elapsed
importing rows 5000000 to 6000000...Done. 853.6116347312927 total seconds elapsed
importing rows 6000000 to 7000000...Done. 988.5526757240295 total seconds elapsed
importing rows 7000000 to 8000000...Done. 1120.1439294815063 total seconds elapsed
importing rows 8000000 to 9000000...Done. 1252.0970404148102 total seconds elapsed
importing rows 9000000 to 10000000...Done. 1383.4827988147736 total seconds elapsed
importing rows 10000000 to 11000000...Done. 1638.500916004181 total seconds elapsed
importing rows 11000000 to 12000000...Done. 1764.9978594779968 total seconds elapsed
importing ro