# Wikipedia Movies ETL Challenge

In [1]:
import json
import pandas as pd
import numpy as np
import re
from sqlalchemy import create_engine
from config import db_password
import time

# Inputs:
    1. wikipedia.movies.json
        scraped JSON of wikipedia movie data
    2. movies_metadata.csv
        Kaggle dataset of movies, and movie dataset from The Movie Database (TMDb)
    3. ratings.csv
        Dataset of all TMDb movie reviews (over 26 million reviews)|

In [2]:
#File directory location that contains inputs:
file_dir = r"C:\Users\Nathan\Documents\Data Bootcamp\Movies_ETL\Resources"

#Read in inputs:
with open(f'{file_dir}/wikipedia.movies.json', mode='r') as file:
    wiki_movies_raw = json.load(file)
    
kaggle_metadata = pd.read_csv(f'{file_dir}/movies_metadata.csv')
ratings = pd.read_csv(f'{file_dir}/ratings_sample.csv')

  interactivity=interactivity, compiler=compiler, result=result)


In [3]:


def movies_etl(wiki_movies_raw,kaggle_metadata, ratings):
    
#Read Inputs:

    #Create a Pandas DF from raw JSON
    wiki_movies_df = pd.DataFrame(wiki_movies_raw)
    
    #Grab movies where there is a director/directed by and is a movie
    wiki_movies = [movie for movie in wiki_movies_raw
                   if ('Director' in movie or 'Directed by' in movie)
                   and 'imdb_link' in movie
                   and 'No. of episodes' not in movie]
    
    #clean_movie function that searches for alternate title name of the movie in language columns.
    #If Alt name found append to a list for title name, and then pop 
    def clean_movie(movie):
        movie = dict(movie) #create a non-destructive copy
        alt_titles = {}
        for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                    'Hangul','Hebrew','Hepburn','Japanese','Literally',
                    'Mandarin','McCune–Reischauer','Original title','Polish',
                    'Revised Romanization','Romanized','Russian',
                    'Simplified','Traditional','Yiddish']:
            if key in movie:
                alt_titles[key] = movie[key]
                movie.pop(key)
        if len(alt_titles) > 0:
            movie['alt_titles'] = alt_titles
    
        #Function to alter column names. We have a lot of duplicated columns, so lets try to combine them using pop
        def change_column_name(old_name,new_name):
            if old_name in movie:
                movie[new_name] = movie.pop(old_name)

        change_column_name('Adaptation by', 'Writer(s)')
        change_column_name('Country of origin', 'Country')
        change_column_name('Directed by', 'Director')
        change_column_name('Distributed by', 'Distributor')
        change_column_name('Edited by', 'Editor(s)')
        change_column_name('Length', 'Running time')
        change_column_name('Original release', 'Release date')
        change_column_name('Music by', 'Composer(s)')
        change_column_name('Produced by', 'Producer(s)')
        change_column_name('Producer', 'Producer(s)')
        change_column_name('Productioncompanies ', 'Production company(s)')
        change_column_name('Productioncompany ', 'Production company(s)')
        change_column_name('Released', 'Release Date')
        change_column_name('Release Date', 'Release date')
        change_column_name('Screen story by', 'Writer(s)')
        change_column_name('Screenplay by', 'Writer(s)')
        change_column_name('Story by', 'Writer(s)')
        change_column_name('Theme music composer', 'Composer(s)')
        change_column_name('Written by', 'Writer(s)')
    
        return movie
    
    #Clean movies becomes our wiki_movies_df
    wiki_movies_df = pd.DataFrame([clean_movie(movie) for movie in wiki_movies])
    
    #Populate imdb_ID IF data issue with imdb_link contains the id field. Use RegEx
    wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
    
    #Get imdb_id as UNIQUE. Drop duplicates
    wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
    
    #Only want columns that are less than 90% null values.
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]
    
    #Handle and sanitize Box Office Input
    #Grab all entries where Box Office is not NaN
    box_office = wiki_movies_df['Box office'].dropna()
    #RegEx Expreissions for Box Office value inputs. 
    
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'
    #Parse_dollars function that will take and sanitize input to convert to FLOAT
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan
        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):
            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)
            # convert to float and multiply by a million
            value = float(s) * 10**6
            # return value
            return value
        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):
            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)
            # convert to float and multiply by a billion
            value = float(s) * 10**9
            # return value
            return value
        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):
            # remove dollar sign and commas
            s = re.sub('\$|,','', s)
            # convert to float
            value = float(s)
            # return value
            return value
        # otherwise, return NaN
        else:
            return np.nan
    #Apply formula to all values matching RegEx Expressions for box office    
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    #Drop original column
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    #Repeat sanitize steps for budget:
    budget = wiki_movies_df['Budget'].dropna()
    
    # Convert any lists to strings:
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    #Remove any values between a dollar sign and a hyphen
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    #Remove any square brackets in our values
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    
    #Apply formula to all values matching RegEx Expressions for budget    
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

    #can drop the old 'Budget' Column since we have a new and updated one. 
    wiki_movies_df.drop('Budget', axis=1, inplace=True)

    
    #Convert release date from UNIX to datetime:
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    #Date forms to be parsed through:
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    #Pandas to_datetime 
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)

    # Running Time
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    # RegEx expression to sanitize all values. Extract out full minutes, and hours from string
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    #Apply formula to all values matching RegEx Expressions for running time
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    # Drop the old column
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    
    
#KAGGLE MOVIES METADATA
    # Keep only non-adult movies from Kaggle Dataset
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
    #Convert the rest of the mismatched column types to numeric:
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')

    #Convert this to to_datetime using pandas:
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])

    
#RATINGS
    #Convert unix timestamp to datetime
    ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s')
    
    
#MERGING DATA TOGETHER
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])
    
    #Drop this one movie because the release date is off:
    try:
        movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index)
    except: 
        pass
    #Drop these columns from the dataset. We'll use the Kaggle version of these columns/don't need these columns
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)
    
    #function that fills in missing data for a column pair, and then drops the redundant column
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
            , axis=1)
        df.drop(columns=wiki_column, inplace=True)
    
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')

    #Reordering the df to make more sense
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                           'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                           'genres','original_language','overview','spoken_languages','Country',
                           'production_companies','production_countries','Distributor',
                           'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                          ]]

    #rename the columns to be consistent:
    movies_df.rename({'id':'kaggle_id',
                      'title_kaggle':'title',
                      'url':'wikipedia_url',
                      'budget_kaggle':'budget',
                      'release_date_kaggle':'release_date',
                      'Country':'country',
                      'Distributor':'distributor',
                      'Producer(s)':'producers',
                      'Director':'director',
                      'Starring':'starring',
                      'Cinematography':'cinematography',
                      'Editor(s)':'editors',
                      'Writer(s)':'writers',
                      'Composer(s)':'composers',
                      'Based on':'based_on'
                     }, axis='columns', inplace=True)    
    
##RATINGS APPEND
    #Count number of reviews per Movie ID. 
    #Rename the column UserID to Count. Doesn't matter that we rename userID it has arbitrary data in it for a count
    #Pivot the data so that the index is MovieID, columns will be rating values, and rows will be the counts for each rating value.
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                    .rename({'userId':'count'}, axis=1) \
                    .pivot(index='movieId',columns='rating', values='count')

    #Prepend the rating_ to each column
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]

    #Merge ratings data
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')

    # Fill in 0's for missing values and we good:
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

    return movies_df
    
    
    

In [4]:
movies_df = movies_etl(wiki_movies_raw,kaggle_metadata, ratings)

## SQL Import

In [5]:

db_string = f"postgres://postgres:{db_password}@127.0.0.1:5432/movie_data"
engine = create_engine(db_string)

start_time = time.time()
try:
    engine.execute('DELETE FROM movies;')
    engine.execute('commit;')
    print('movies_df table data TRUNCATED.')
except:
    pass
movies_df.to_sql(name = 'movies',con = engine,if_exists = 'append')
print(f'movies_df Done. {time.time() - start_time} total seconds elapsed')


#Ratings Data full import
rows_imported = 0
start_time = time.time()

try: 
    engine.execute('DELETE FROM ratings;')
    engine.execute('commit;')
except: 
    pass
for data in pd.read_csv(f'{file_dir}/ratings.csv', chunksize=1000000):
    # print out the range of rows that are being imported
    print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
    data.to_sql(name='ratings', con=engine, if_exists='append',method = 'multi')
    # increment the number of rows imported by the size of 'data'
    rows_imported += len(data)
    # print that the rows have finished importing
    print(f'Done. {time.time() - start_time} total seconds elapsed')

movies_df table data TRUNCATED.
movies_df Done. 1.6535015106201172 total seconds elapsed
importing rows 0 to 1000000...Done. 138.03849864006042 total seconds elapsed
importing rows 1000000 to 2000000...Done. 231.5364739894867 total seconds elapsed
importing rows 2000000 to 3000000...Done. 322.5301375389099 total seconds elapsed
importing rows 3000000 to 4000000...Done. 414.3806154727936 total seconds elapsed
importing rows 4000000 to 5000000...Done. 504.58860087394714 total seconds elapsed
importing rows 5000000 to 6000000...Done. 597.2856662273407 total seconds elapsed
importing rows 6000000 to 7000000...Done. 687.3074865341187 total seconds elapsed
importing rows 7000000 to 8000000...Done. 780.1738848686218 total seconds elapsed
importing rows 8000000 to 9000000...Done. 873.4825332164764 total seconds elapsed
importing rows 9000000 to 10000000...Done. 966.6141765117645 total seconds elapsed
importing rows 10000000 to 11000000...Done. 1059.39750289917 total seconds elapsed
importing r