In [11]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [12]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    movie = dict(movie) #create a non-destructive copy
    alt_titles = {}
    return movie   

In [13]:
file_dir = 'C://Users/sergi/Documents/1_UTBootcam/Module_8/Movies-ETL/'

In [14]:
# 2 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load():
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata = pd.read_csv(f'{file_dir}movies_metadata.csv', low_memory=False)
    ratings = pd.read_csv(f'{file_dir}ratings.csv')
    
    # Open the read the Wikipedia data JSON file.
    f'{file_dir}wikipedia-movies.json'
    with open(f'{file_dir}/wikipedia-movies.json', mode='r') as file:
        wiki_movies_raw = json.load(file)
    

    # 3. Write a list comprehension to filter out TV shows.
    wiki_tvshows = [tv_shows for tv_shows in wiki_movies_raw 
        if 'Television series' in tv_shows]   
    
    # 4. Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_movies = [clean_movie(movie) for movie in wiki_movies_raw]

    # 5. Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(clean_movies)

    # 6. Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
       
    except:
        print("error extrating IMDb ID")

    #  7. Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]
    
    # 8. Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    
    # 9. Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office[box_office.map(lambda x: type(x) != str)]


    # 10. Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$\d+\.?\d*\s*[mb]illion'
    # 11. Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$\d{1,3}(?:,\d{3})+'

    # 12. Add the parse_dollars function.
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
                return np.nan

        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a million
            value = float(s) * 10**6

            # return value
            return value

        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            # return value
            return value

        # otherwise, return NaN
        else:
            return np.nan
    
        
    # 13. Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    # 14. Clean the budget column in the wiki_movies_df DataFrame.
    budget = wiki_movies_df['Budget'].dropna()
    
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    
    matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE, na=False)
    matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE, na=False)
    budget[~matches_form_one & ~matches_form_two]
    
    #Remove the citation references
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    budget[~matches_form_one & ~matches_form_two]
    
    # Parse the budget; copy the line of code we used to parse the box office values, changing "box_office" to "budget"
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    #drop the original Budget column
    wiki_movies_df.drop('Budget', axis=1, inplace=True)
        
    # 15. Clean the release date column in the wiki_movies_df DataFrame.
    #Parse "Release Date"make a variable that holds the non-null values of Release date in the DataFrame, converting lists to strings
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    #Write regular expressions for each date form
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[0123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    
    #extract the dates
    release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)
    
    # Use the built-in to_datetime() method in Pandas. Since there are different date formats, set the infer_datetime_format 
    # option to True
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)
    
    # 16. Clean the running time column in the wiki_movies_df DataFrame.
    #  Make a variable that holds the non-null values of "Running Time" date in the DataFrame, converting lists to strings
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    
    # Apply a function that will convert the hour capture groups and minute capture groups to minutes if the pure minutes
    # capture group is zero, and save the output to wiki_movies_df
    
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    
    # drop Running time from the dataset
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    
# 2. Clean the Kaggle metadata.
    # check that all the values are either True or False.
    kaggle_metadata[~kaggle_metadata['adult'].isin(['True','False'])]
    
    #The following code will keep rows where the adult column is False, and then drop the adult column
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
    
    #  convert Video entries to Boolean
    kaggle_metadata['video'] == 'True'
    
    #The above code creates the Boolean column we want. We just need to assign it back to video
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    
    # For the numeric columns, use the to_numeric() Pandas method with errors= argument is set to 'raise' to ID data that can't 
    # be converted to numbers
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    
    # release_date standard format to_datetime()
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])
    
    #specify in to_datetime() that the origin is 'unix' and the time unit is seconds.
    pd.to_datetime(ratings['timestamp'], unit='s')
    
    #Since the output looks reasonable, assign it to the timestamp column.
    ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s')
    
    # Merge the two DataFrames into the movies DataFrame
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])
       
    movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index)
    
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language', ], inplace=True)
   

    # fills in missing data for a column pair and then drops the redundant column
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column], axis=1)
        df.drop(columns=wiki_column, inplace=True)
    
    #Call the function in Step 5 with the DataFrame and columns as the arguments.
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
    
    #Filter the movies DataFrame for specific columns.
    for col in movies_df.columns:
        lists_to_tuples = lambda x: tuple(x) if type(x) == list else x
        value_counts = movies_df[col].apply(lists_to_tuples).value_counts(dropna=False)
        num_values = len(value_counts)
    if num_values == 1:
        print(col)
      
    
    # reorder the columns
    # 8. Rename the columns in the movies DataFrame.
    column_names = ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                        'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                        'genres','original_language','overview','spoken_languages','Country',
                        'production_companies','production_countries','Distributor',
                        'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                       ]
    movies_df = movies_df.reindex(columns=column_names)

    
    # rename the columns to be consistent
    movies_df.rename({'id':'kaggle_id',
                      'title_kaggle':'title',
                      'url':'wikipedia_url',
                      'budget_kaggle':'budget',
                      'release_date_kaggle':'release_date',
                      'Country':'country',
                      'Distributor':'distributor',
                      'Producer(s)':'producers',
                      'Director':'director',
                      'Starring':'starring',
                      'Cinematography':'cinematography',
                      'Editor(s)':'editors',
                      'Writer(s)':'writers',
                      'Composer(s)':'composers',
                      'Based on':'based_on'
                     }, axis='columns', inplace=True)
    #use a groupby on the "movieId" and "rating" columns and take the count for each group.
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count()
    # rename the "userId" column to "count."
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                    .rename({'userId':'count'}, axis=1)
    # pivot this data so that movieId is the index, the columns will be all the rating values, and the rows will
    # be the counts for each rating value.
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                    .rename({'userId':'count'}, axis=1) \
                    .pivot(index='movieId',columns='rating', values='count')
    # rename the columns so they're easier to understand. We'll prepend rating_ to each column with a list comprehension
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    # use a left merge to merge the rating counts into movies_df
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    # not every movie got a rating for each rating level, there will be missing values instead of zeros. fill those
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)


    db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"

    engine = create_engine(db_string)
    
    movies_df.to_sql(name='movies', con=engine)

    #create a variable for the number of rows imported
    rows_imported = 0
    # get the start_time from time.time()
    start_time = time.time()
    for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        data.to_sql(name='ratings', con=engine, if_exists='append')
        rows_imported += len(data)

        #add elapsed time to final print out
        print(f'Done. {time.time() - start_time} total seconds elapsed')

In [15]:
# 10. Create the path to your file directory and variables for the three files.
#file_dir = 'C://Users/sergi/Documents/1_UTBootcam/Module_8/Movies-ETL/'

# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia.movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [16]:
# 11. Set the three variables equal to the function created in D1.
wiki_file, kaggle_file, ratings_file = extract_transform_load()



importing rows 0 to 1000000...Done. 28.235391855239868 total seconds elapsed
importing rows 1000000 to 2000000...Done. 57.69222640991211 total seconds elapsed
importing rows 2000000 to 3000000...Done. 87.17433071136475 total seconds elapsed
importing rows 3000000 to 4000000...Done. 117.63561987876892 total seconds elapsed
importing rows 4000000 to 5000000...Done. 147.95747351646423 total seconds elapsed
importing rows 5000000 to 6000000...Done. 177.96840906143188 total seconds elapsed
importing rows 6000000 to 7000000...Done. 208.75805163383484 total seconds elapsed
importing rows 7000000 to 8000000...Done. 242.50498366355896 total seconds elapsed
importing rows 8000000 to 9000000...Done. 272.06476068496704 total seconds elapsed
importing rows 9000000 to 10000000...Done. 302.13103222846985 total seconds elapsed
importing rows 10000000 to 11000000...Done. 336.3561546802521 total seconds elapsed
importing rows 11000000 to 12000000...Done. 368.04346203804016 total seconds elapsed
importin

TypeError: cannot unpack non-iterable NoneType object

In [None]:
# 12. Set the DataFrames from the return statement equal to the file names in Step 11. 
wiki_movies_df = wiki_file
movies_with_ratings_df = kaggle_file
movies_df = ratings_file

In [None]:
# 13. Check the wiki_movies_df DataFrame. 
wiki_movies_df.head()

In [None]:
# 14. Check the movies_with_ratings_df DataFrame.
movies_with_ratings_df.head()

In [None]:
# 15. Check the movies_df DataFrame. 
movies_df.head()