In [14]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [15]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    movie = dict(movie) #create a non-destructive copy, local to function only
    
    #Combine alternate titles into one list
    alt_titles = {}
    for key in  ['Also known as','Arabic','Cantonese','Chinese','French','Hangul','Hebrew',
                    'Hepburn','Japanese','Literally','Mandarin','McCune–Reischauer','Original title',
                    'Polish', 'Revised Romanization','Romanized','Russian','Simplified','Traditional','Yiddish']:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles
        
    #merge duplicate column names
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    change_column_name('Directed by','Director')
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin','Country')
    change_column_name('Composer','Composer(s)')
    change_column_name('Directed by','Director')
    change_column_name('Distributed by','Distributor(s)')
    change_column_name('Distributor','Distributor(s)')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length','Running Time')
    change_column_name('Original Release','Release Date')
    change_column_name('Music by','Composer(s)')
    change_column_name('Produced by','Producer(s)')
    change_column_name('Producer','Producer(s)')
    change_column_name('Productioncompanies ','Production company(s)')
    change_column_name('Productioncompany ','Production company(s)')
    change_column_name('released','Release Date')
    change_column_name('Released','Release Date')
    change_column_name('Release Date','Release date')
    change_column_name('Running Time','Running time')
    change_column_name('Screen story by','Writer(s)')
    change_column_name('Screenplay by','Writer(s)')
    change_column_name('Story by','Writer(s)')
    change_column_name('Theme music composer','Composer(s)')
    change_column_name('Written by','Writer(s)')
    
    return movie

In [16]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def hackathon_bring_in_data(wiki_data, kaggle_data, rating_data ):
    
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    
    kaggle_data_df = pd.read_csv(kaggle_data,low_memory = False)
    ratings_df = pd.read_csv(rating_data)
    
    # Open and read the read the Wikipedia data JSON file.
    with open(wiki_data,mode = 'r') as file:
        wiki_movies_raw = json.load(file)
    
    # 3. Write a list comprehension to filter out TV shows.
    
        wiki_movies = [movie for movie in wiki_movies_raw
              if ('Director' in movie or 'Directed by' in movie)
              and 'imdb_link' in movie
              and "No. of episodes" not in movie]
    
    # 4. Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
        clean_movies = [clean_movie(movie) for movie in wiki_movies]
    
    # 5. Read in the cleaned movies list from Step 4 as a DataFrame.
        wiki_movies_df = pd.DataFrame(clean_movies)
    
    # 6. Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    
   
        #declare an array to temporarily hold imdb ids to check for duplicates

        for index, row in wiki_movies_df.iterrows():
           
            try:

                wiki_movies_df['imdb_id']=wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
                wiki_movies_df.drop_duplicates(subset = 'imdb_id',inplace=True)
                
            except:
                print(f'Error extracting IMDb ID, skipping')
                pass 
    
     #  7. Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
        wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
        wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]
        
     # 8. Create a variable that will hold the non-null values from the “Box office” column.
        
        box_office = wiki_movies_df['Box office'].dropna()
    # 9. Convert the box office data created in Step 8 to string values using the lambda and join functions.
    
        box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)
        
    # 10. Write a regular expression to match the six elements of "form_one" of the box office data.
        form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
    # 11. Write a regular expression to match the three elements of "form_two" of the box office data.
    
        form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'
    # 12. Add the parse_dollars function.
    
        def parse_dollars(s):
        # if s is not a string, return NaN
            if type(s) != str:
                return np.nan

            # if input is of the form $###.# million
            if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

                # remove dollar sign and " million"
                s = re.sub('\$|\s|[a-zA-Z]','', s)

                # convert to float and multiply by a million
                value = float(s) * 10**6

                # return value
                return value

            # if input is of the form $###.# billion
            elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

                # remove dollar sign and " billion"
                s = re.sub('\$|\s|[a-zA-Z]','', s)

                # convert to float and multiply by a billion
                value = float(s) * 10**9

                # return value
                return value

            # if input is of the form $###,###,###
            elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

                # remove dollar sign and commas
                s = re.sub('\$|,','', s)

                # convert to float
                value = float(s)

                # return value
                return value

            # otherwise, return NaN
            else:
                return np.nan
        
    # 13. Clean the box office column in the wiki_movies_df DataFrame.

        wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags = re.IGNORECASE)[0].apply(parse_dollars)
        wiki_movies_df.drop('Box office', axis=1, inplace = True)
        
    # 14. Clean the budget column in the wiki_movies_df DataFrame.
    
        #create a variable that will hold the non null values of budget
        budget = wiki_movies_df['Budget'].dropna()
        
        #Convert the budget variable to string values using the lambda and join functions.
        budget = budget.apply(lambda x: ' '.join(x) if type(x) == list else x)
        
        #remove any extra stuff from between the dollar sign and the hyphen
        budget = budget.str.replace(r'\$.*[---](?![a-z])','$',regex=True)
        
        #remove citation references
        budget = budget.str.replace(r'\[\d+\]\s*','')
        
        #apply parse dollars to budget column
        wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags = re.IGNORECASE)[0].apply(parse_dollars)
        
        #remove old, uncleaned budget column
        wiki_movies_df.drop('Budget', axis=1, inplace = True)
        
    # 15. Clean the release date column in the wiki_movies_df DataFrame.
    
        #create a variable that will hold the non null values of release date
        release_date = wiki_movies_df['Release date'].dropna()
        
        #Convert the budget variable to string values using the lambda and join functions.
        release_date = release_date.apply(lambda x: ' '.join(x) if type(x) == list else x)
        
        #create regex expressions for the four forms of dates in the data
        date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
        date_form_two = r'\d{4}.[01]\d.[0123]\d'
        date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
        date_form_four = r'\d{4}'
        
        #use the regex forms to extract the dates and pd.to_datetime to make standardized date formats for release date
        wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format = True)
        
        #remove old, uncleaned release date column
        wiki_movies_df.drop('Release date', axis=1, inplace = True)
    
    # 16. Clean the running time column in the wiki_movies_df DataFrame.
        
        #create a variable that will hold the non null values of release date
        running_time = wiki_movies_df['Running time'].dropna()
        
        #Convert the budget variable to string values using the lambda and join functions. 
        running_time = running_time.apply(lambda x: ' '.join(x) if type(x) == list else x)
        
        #use regex to extract the running time from the data
        running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
        
        #convert strings to numbers we can apply math functions to
        running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
        
        #convert numeric hours, minutes, and seconds (if they exist) to a standardized runtime
        wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
        
        #remove old, uncleaned runtime column
        wiki_movies_df.drop('Running time', axis=1, inplace = True)
        
    # 2. Clean the Kaggle metadata.
    
        #Clean up data types
        
        #Keep rows where adult column is false and drop the column - would convert to boolean but keep non-adult
        #films and delete adult column instead
        
        kaggle_data_df = kaggle_data_df[kaggle_data_df['adult'] == 'False'].drop('adult',axis='columns')
        
        #Convert to boolean datatype
        kaggle_data_df['video'] = kaggle_data_df['video'] == 'True'
        
        #Convert to numeric datatype
        kaggle_data_df['budget'] = kaggle_data_df['budget'].astype(int)
        kaggle_data_df['id'] = pd.to_numeric(kaggle_data_df['id'], errors='raise')
        kaggle_data_df['popularity'] = pd.to_numeric(kaggle_data_df['popularity'], errors='raise')

        #Convert to a date datatype
        kaggle_data_df['release_date'] = pd.to_datetime(kaggle_data_df['release_date'])
    
    # 3. Merged the two DataFrames into the movies DataFrame.
        movies_df = pd.merge(wiki_movies_df, kaggle_data_df, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # 4. Drop unnecessary columns from the merged DataFrame.
        
        #Drop outlier/corrupted release date row
        #movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index)
        
        #Drop wiki title, wiki release date, language, production company because we are keeping the other versions of these
        movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)
        
    # 5. Add in the function to fill in the missing Kaggle data.
        def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
            df[kaggle_column] = df.apply(
                lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
                , axis=1)
            df.drop(columns=wiki_column, inplace=True)

    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    
        fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
        fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
        fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')


    # 7. Filter the movies DataFrame for specific columns.
        movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor(s)',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]


    # 8. Rename the columns in the movies DataFrame.
        movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor(s)':'distributor(s)',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

    # 9. Transform and merge the ratings DataFrame.
    
    #Convert timestamp to a date datatype
        ratings_df['timestamp'] = pd.to_datetime(ratings_df['timestamp'], unit='s')
        
        #Compress huge ratings data file by counting the number of times each movie received each rating
        rating_counts = ratings_df.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')
        
        #Rename the columns to show what they are - counts by rating, with the rating as the column name
        rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
        
        #Merge the ratings into the big movie df
        movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
        movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)
        
    #9.5 After Step 9, Transform and merge the ratings DataFrame, add the code to create the connection to the PostgreSQL 
    #    database, then add the movies_df DataFrame to a SQL database.
    
        db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"
        engine = create_engine(db_string)
        movies_df.to_sql(name='movies', con=engine,if_exists = "replace")
        # create a variable for the number of rows imported
        rows_imported = 0
        # get the start_time from time.time()
        start_time = time.time()

        for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):

            # print out the range of rows that are being imported
            print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')

            data.to_sql(name='ratings', con=engine, if_exists='append')

            # increment the number of rows imported by the size of 'data'
            rows_imported += len(data)

            # print that the rows have finished importing
            # add elapsed time to final print out
            print(f'Done. {time.time() - start_time} total seconds elapsed')
        
    # Return three variables. The first is the wiki_movies_df DataFrame
    #    return wiki_movies_df, movies_with_ratings_df, movies_df



In [17]:
# 10. Create the path to your file directory and variables for the three files.

file_dir = 'C://Users/mgsri/OneDrive/Desktop/Analysis Projects/Movies-ETL/'
# Wikipedia data

wiki_file = f'{file_dir}wikipedia-movies.json'
# Kaggle metadata
kaggle_file = f'{file_dir}movies_metadata.csv'

# MovieLens rating data.
ratings_file = f'{file_dir}ratings.csv'


In [18]:
# 11. Run the function to import the three data files
hackathon_bring_in_data(wiki_file, kaggle_file, ratings_file)




importing rows 0 to 1000000...Done. 25.442790508270264 total seconds elapsed
importing rows 1000000 to 2000000...Done. 50.14140868186951 total seconds elapsed
importing rows 2000000 to 3000000...Done. 75.24153304100037 total seconds elapsed
importing rows 3000000 to 4000000...Done. 99.81121873855591 total seconds elapsed
importing rows 4000000 to 5000000...Done. 124.50200080871582 total seconds elapsed
importing rows 5000000 to 6000000...Done. 149.4202184677124 total seconds elapsed
importing rows 6000000 to 7000000...Done. 173.97940397262573 total seconds elapsed
importing rows 7000000 to 8000000...Done. 198.95478057861328 total seconds elapsed
importing rows 8000000 to 9000000...Done. 223.603200674057 total seconds elapsed
importing rows 9000000 to 10000000...Done. 248.0964059829712 total seconds elapsed
importing rows 10000000 to 11000000...Done. 273.3096401691437 total seconds elapsed
importing rows 11000000 to 12000000...Done. 297.8901963233948 total seconds elapsed
importing rows