In [None]:
import json
import pandas as pd
import numpy as np

import re
from sqlalchemy import create_engine
import psycopg2

import time

from config import db_password

## Deliverable 1

In [None]:
# 1. Create a function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load():
    
    # 2. Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata = pd.read_csv(f'{file_dir}movies_metadata.csv', low_memory=False)
    ratings = pd.read_csv(f'{file_dir}ratings.csv')
    
    # 3. Open the read the Wikipedia data JSON file.
    with open(f'{file_dir}wikipedia-movies.json', mode = 'r') as file:
    
    # 4. Read in the raw wiki movie data as a Pandas DataFrame.
        wiki_movies_raw = json.load(file)
    
    # 5. Return the three DataFrames
    return wiki_movies_df, kaggle_metadata, ratings

# 6 Create the path to your file directory and variables for the three files. 
file_dir = 'C://Users/maron/Class/Movies-ETL/'

# Wikipedia data
wiki_file = f'{file_dir}/wikipedia_movies.json'

# Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'

# MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

# 7. Set the three variables in Step 6 equal to the function created in Step 1.
wiki_file, kaggle_file, ratings_file = extract_transform_load()

In [None]:
# 8. Set the DataFrames from the return statement equal to the file names in Step 6. 
wiki_movies_raw = wiki_file
kaggle_metadata = kaggle_file
ratings = ratings_file

In [None]:
# 9. Check the wiki_movies_df DataFrame.
wiki_movies_df = pd.DataFrame(wiki_movies_raw)
wiki_movies_df.head()

In [None]:
# 10. Check the kaggle_metadata DataFrame.
kaggle_metadata.head()

In [None]:
# 11. Check the ratings DataFrame.
ratings.head()

## DELIVERABLE 2

In [None]:
# 1. Add the clean movie function that takes in the argument, "movie".

def clean_movie(movie):
    movie = dict(movie)  # Create a non-destructive copy of movie to be referred to
    # Make empty dict to hold alt titles
    alt_titles = {}
    # Loop through the list of alt title keys
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune–Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
    # Check if current key exists in movie object
        if key in movie:
            # If so, remove key-value pair and add to alt titles dict
            alt_titles[key] = movie[key]
            movie.pop(key)
    #  After looping through every key, add the alternative titles dict to the movie object.
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles
    
    
    return movie

In [None]:
# 3. Write a list comprehension to filter out TV shows.
wiki_movies_only = [movie for movie in wiki_movies_raw if 'No. of episodes' not in movie]

wiki_movies_only

In [None]:
# 4. Write a list comprehension to iterate through the cleaned wiki movies list
# and call the clean_movie function on each movie.

clean_movies = [clean_movie(movie) for movie in wiki_movies_only]

clean_movies

In [None]:
# 5. Read in the cleaned movies list from Step 4 as a DataFrame.

wiki_movies_df = pd.DataFrame(clean_movies)

wiki_movies_df.sample(5)

In [None]:
# 6. Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
#  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
try:
    wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
    wiki_movies_df.drop_duplicates(subset='imdb_id', inplace = True)
except:
    print(f'Error extracting IMDb ID for {title}')

In [None]:
print(len(wiki_movies_df))

In [None]:
#  7. Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.

wiki_columns_to_keep = [column for column in wiki_movies_df.columns if 
                        wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]

wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]      

wiki_movies_df.head()

In [None]:
# 8. Create a variable that will hold the non-null values from the “Box office” column.

box_office = wiki_movies_df['Box office'].dropna()

In [None]:
# 9. Convert the box office data created in Step 8 to string values using the lambda and join functions.

box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

In [None]:
# 10. Write a regular expression to match the six elements of "form_one" of the box office data.
form_one = r'\$\s*\d+\.?\d*\s*[mb]illion'

# 11. Write a regular expression to match the three elements of "form_two" of the box office data.
form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'


In [None]:
# 12. Add the parse_dollars function.

# Create function parse_dollars to turn extracted values into numeric values

def parse_dollars(s):
    # if s is not a string, return NaN
    
    if type(s) != str:
        return np.nan
        
    # if input is of the form $###.# million
    if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):
        

        # remove dollar sign and " million"
        s = re.sub('\$|\s|[a-zA-Z]', '', s)
        
        # convert to float and multiply by a million
        value = float(s) * 10**6

        # return value
        return value

    # if input is of the form $###.# billion
    elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

        # remove dollar sign and " billion"
        s = re.sub('\$|\s|[a-zA-Z]','', s)

        # convert to float and multiply by a billion
        value = float(s) * 10**9
        
        # return value
        return value

    # if input is of the form $###,###,###
    elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

        # remove dollar sign and commas
        s = re.sub('\$|,','', s)

        # convert to float
        value = float(s)

        # return value
        return value

    # otherwise, return NaN
    else:
        return np.nan

In [None]:
# 13. Clean the box office column in the wiki_movies_df DataFrame.

wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

In [None]:
# 14. Clean the budget column in the wiki_movies_df DataFrame.

# PARSE BUDGET DATA 
budget = wiki_movies_df['Budget'].dropna()

# Convert any lists to string
budget = budget.map(lambda x: ' '.join(x) if type (x) == list else x)

# Remove values between a dollar sign adn hyphen
budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)

matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE, na=False)
matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE, na=False)
budget[~matches_form_one & ~matches_form_two]

# Remove citation references 
budget = budget.str.replace(r'\[\d+\]\s*', '', regex=True)
budget[~matches_form_one & ~matches_form_two]

# Clean budget data column
wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
wiki_movies_df.drop('Budget', axis=1, inplace=True)

In [None]:
# 15. Clean the release date column in the wiki_movies_df DataFrame.

# PARSE RELEASE DATE
release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
date_form_two = r'\d{4}.[01]\d.[0123]\d'
date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
date_form_four = r'\d{4}'

release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)

wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)


In [None]:
# 16. Clean the running time column in the wiki_movies_df DataFrame.

running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

running_time[running_time.str.contains(r'^\d*\s*minutes$', flags=re.IGNORECASE, na=False) != True]
running_time[running_time.str.contains(r'^\d*\s*m', flags=re.IGNORECASE, na=False) != True]

running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')

running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)

wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)

# Drop running time
wiki_movies_df.drop('Running time', axis=1, inplace=True)

In [None]:
wiki_movies_df.head()

In [None]:
# 17. Create the path to your file directory and variables for the three files.
file_dir = 'C://Users/maron/Class/Movies-ETL/'
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia_movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [None]:
# 18. Set the three variables equal to the function created in D1.

wiki_file, kaggle_file, ratings_file = extract_transform_load()

In [None]:
# 19. Set the wiki_movies_df equal to the wiki_file variable. 
wiki_movies_df = wiki_file

In [None]:
# 20. Check that the wiki_movies_df DataFrame looks like this. 
wiki_movies_df.head()

In [None]:
# 21. Check that wiki_movies_df DataFrame columns are correct. 
wiki_movies_df.columns.to_list()

## DELIVERABLE 3

In [None]:
# 2. Clean Kaggle metadata

kaggle_metadata[~kaggle_metadata['adult'].isin(['True','False'])]
kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')

kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])

In [None]:
# Reformat ratings to datetime

ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s')

In [None]:
# 3a. Merge the two dataframes into the movies dataframe

movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

movies_df.head()

In [None]:
# Running time vs run time

movies_df.fillna(0).plot(x='running_time', y='runtime', kind='scatter')

In [None]:
# 4. Drop unnecessary columns from the merged DataFrame.

movies_df.drop(columns=['title_wiki','release_date_wiki','Language'], inplace=True)

In [None]:
# 5. Add in the function to fill in the missing Kaggle data.

def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
    df[kaggle_column] = df.apply(
        lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
        , axis=1)
    df.drop(columns=wiki_column, inplace=True)

In [None]:
# 6. Call the function in Step 5 with the DataFrame and columns as the arguments.

fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
movies_df.head()

In [None]:
# 8. Rename the columns in the movies DataFrame.

movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

In [None]:
# 9. Transform and merge ratings data

rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')

rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]

movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')

In [None]:
movies_with_ratings_df.head()

In [None]:
# 10. Create the path to your file directory and variables for the three files.
file_dir = 'C://Users/maron/Class/Movies-ETL/'
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia_movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [None]:
# 11. Set the three variables equal to the function created in D1.
wiki_file, kaggle_file, ratings_file = extract_transform_load()

In [None]:
# 12. Set the DataFrames from the return statement equal to the file names in Step 11. 
wiki_movies_df = wiki_file
movies_with_ratings_df = kaggle_file
movies_df = ratings_file

In [None]:
# 13. Check the wiki_movies_df DataFrame. 
wiki_movies_df.head()

In [None]:
# 14. Check the movies_with_ratings_df DataFrame.
movies_with_ratings_df.head()

In [None]:
# 15. Check the movies_df DataFrame. 
movies_df.head()

## DELIVERABLE 4

In [None]:
# Create connection string

db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"

In [None]:
engine = create_engine(db_string)

In [None]:
# import movie data to SQL

movies_df.to_sql(name='movies', con=engine, if_exists = 'replace')

In [10]:
# create a variable for the number of rows imported
rows_imported = 0

#Get the start_time from time.time()
start_time = time.time()

for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):

    # print out the range of rows that are being imported
    print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')

    data.to_sql(name='ratings', con=engine, if_exists='append')

    # increment the number of rows imported by the size of 'data'
    rows_imported += len(data)

    # print that the rows have finished importing
    print(f'Done. {time.time() - start_time} total seconds elapsed')

importing rows 0 to 1000000...Done. 20.088253021240234 total seconds elapsed
importing rows 1000000 to 2000000...Done. 40.03088045120239 total seconds elapsed
importing rows 2000000 to 3000000...Done. 59.48308849334717 total seconds elapsed
importing rows 3000000 to 4000000...Done. 78.53395056724548 total seconds elapsed
importing rows 4000000 to 5000000...Done. 99.3723475933075 total seconds elapsed
importing rows 5000000 to 6000000...Done. 119.51464700698853 total seconds elapsed
importing rows 6000000 to 7000000...Done. 141.33870768547058 total seconds elapsed
importing rows 7000000 to 8000000...Done. 162.35310673713684 total seconds elapsed
importing rows 8000000 to 9000000...Done. 183.4841766357422 total seconds elapsed
importing rows 9000000 to 10000000...Done. 205.08792448043823 total seconds elapsed
importing rows 10000000 to 11000000...Done. 226.7770357131958 total seconds elapsed
importing rows 11000000 to 12000000...Done. 248.86578488349915 total seconds elapsed
importing ro