In [6]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [7]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    movie = dict(movie) #create a non-destructive copy
    alt_titles = {}
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune–Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles
    
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    change_column_name('Directed by', 'Director')
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')
    
    return movie

In [8]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def dataframe_maker(wiki,kagg,rating):
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata = pd.read_csv(kagg, low_memory=False)
    ratings = pd.read_csv(rating)

    # Open the read the Wikipedia data JSON file.
    with open(wiki, mode='r') as file:
        wiki_movies_raw = json.load(file)
    
    # 3. Write a list comprehension to filter out TV shows.
    wiki_movies = [movie for movie in wiki_movies_raw
               if ('Director' in movie or 'Directed by' in movie)
                   and 'imdb_link' in movie
                   and 'No. of episodes' not in movie]
    # Print to test
    #print(len(wiki_movies))
    
    # 4. Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_movies = [clean_movie(movie) for movie in wiki_movies]
   
    
    # 5. Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(clean_movies)
    
    # print to test
    # print(sorted(wiki_movies_df.columns.tolist()))
    
    # 6. Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try: 
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)        
    except KeyError as err:
        print(f'The movie does not have a proper link:', err)
        pass 
        
    #  7. Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]

    # 8. Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    # print to test
    # print(len(box_office))
    
    # 9. Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # 10. Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on' 
    
    # 11. Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'

    # 12. Add the parse_dollars function.
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a million
            value = float(s) * 10**6

            # return value
            return value

        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            # return value
            return value

        # otherwise, return NaN
        else:
            return np.nan
    
    # 13. Clean the box office column in the wiki_movies_df DataFrame.
    #Some values are given as a range.Search for any string that starts with $ and ends with -, and then replace it with $
    box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    # we have good info in the box_office column, so we don't need 'Box office' anymore with its messy data
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    
    # 14. Clean the budget column in the wiki_movies_df DataFrame.
    # create budget and drop NaN
    budget = wiki_movies_df['Budget'].dropna()
    
    # convert lists to strings
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    
    # remove values between a dollar sign and a hyphen (ranges of values)
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE, na=False)
    matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE, na=False)
    
    # remove bracketed citation references
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    
    # move everything into a nice new column
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    # Take out the old column-- ACTUALLY the example in the challenge shows that we should keep this column!
    # wiki_movies_df.drop('Budget', axis=1, inplace=True)
    
    
    # 15. Clean the release date column in the wiki_movies_df DataFrame.
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    #all these forms for date
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[0123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}' 
    
    #extract dates
    release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)
    
    # put the good dates into a new column and use Pandas magic to realize they are all dates in dif. forms
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)
     
        
    # 16. Clean the running time column in the wiki_movies_df DataFrame.
    # parse running time in a new data frame
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    # make our capture more flexible about abbrevs etc.
    running_time.str.contains(r'^\d*\s*m', flags=re.IGNORECASE, na=False).sum()
    
    # get the reamaining times that have time in hours (previously we were only capturing minutes)
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    
    # convert all to numbers and coerce the empty strings into NaN so we can fill with '0'
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    
    # calculate minutes for all and put it into the main df in one step
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    
    # drop the old column
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    
     
    # 2. Clean the Kaggle metadata.
    
    # first, drop anything that doesn't have False for adult.
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
    
    # Convert 'video' to Boolean
    kaggle_metadata['video'] == 'True'
    # add 'video' back to the original column
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    
    # convert the numeric columns and raise errors if they occur
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    
    # convert release date to datetime with pandas
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])
    
    # 3. Merged the two DataFrames into the movies DataFrame.
    # When we look for redundant column names, suffixes will help us know where the data came from
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # 4. Drop unnecessary columns from the merged DataFrame.
    # First drop the Wiki data where needed
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

    # 5. Add in the function to fill in the missing Kaggle data.
    # create a function that compares data and fills in missing
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
            , axis=1)
        df.drop(columns=wiki_column, inplace=True)

    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    # run the finction on the columns where we decided to fill in using Wiki (can't run this twice because it drops the wiki column)
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')

    # 7. Filter the movies DataFrame for specific columns.
    # convert lists to tuples for value counts to work 
    # send us the column name of columns that have only one value
    for col in movies_df.columns:
        lists_to_tuples = lambda x: tuple(x) if type(x) == list else x
        value_counts = movies_df[col].apply(lists_to_tuples).value_counts(dropna=False)
        num_values = len(value_counts)
        if num_values == 1:
            movies_df[col].value_counts(dropna=False)
    
    # 8. Rename the columns in the movies DataFrame.
    # order the columns logically 
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]
    
    movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

    # 9. Transform and merge the ratings DataFrame.
    # Firstly, the date is in unix seconds, and we can convert it
    pd.to_datetime(ratings['timestamp'], unit='s')
    
    # the timestamps seem reasonable, so we can assign it back to the column
    ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s')
    
    # Reduce the ratings data
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count()
    
    # use the userID column to count frequencies of each rating per movie 
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                    .rename({'userId':'count'}, axis=1)
    
    # We can pivot this data so that movieId is the index, 
    # the columns will be all the rating values, 
    # and the rows will be the counts for each rating value.
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')
    
    # rename columns with list comprehension
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    
    # do a left merge to keep all the movie data and put in ratings where they exist
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    
    # fill in NaNs with zeros
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)
    
    # Step 3 of Deliverable 4: add the code to create the connection to the PostgreSQL database
    # then add the movies_df DataFrame to a SQL database.
    # Pandas has a way to do that with a database engine
    !pip install psycopg2-binary
    db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"
    engine = create_engine(db_string)
    # now save the file to SQL
    movies_df.to_sql(name='movies', con=engine, if_exists='replace')
    
    # Step 4 of Deliverable 4: Import Ratings Data In Chunks
    # Remember to drop the ratings table in pgAdmin so you don't append even more rows

    # Step 5 of Deliverable 4: get the start_time from time.time()
    start_time = time.time()

    # create a variable for the number of rows imported
    rows_imported = 0
    for data in pd.read_csv(f'{file_dir}/ratings.csv', chunksize=1000000):

        # print out the range of rows that are being imported
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')

        data.to_sql(name='ratings', con=engine, if_exists='append')

        # increment the number of rows imported by the size of 'data'
        rows_imported += len(data)

        # Step 5 of Deliverable 4, continued: add elapsed time to final print out
        # print that the rows have finished importing
        print(f'Done. {time.time() - start_time} total seconds elapsed')

In [9]:
## NOTE: go to pgAdmin and drop the table for ratings before running these next parts

# Step 10 of Deliverable 3: Create the path to your file directory and variables for the three files.
file_dir = 'C:/Users/Sara/Class/Movies-ETL/Resources'
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia-movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [10]:
# Step 6 and 7 of Deliverable 4: Set the three variables equal to the function created in D1, then run.
wiki_file, kaggle_file, ratings_file = dataframe_maker(wiki_file, kaggle_file, ratings_file)

  budget = budget.str.replace(r'\[\d+\]\s*', '')


importing rows 0 to 1000000...Done. 26.648436069488525 total seconds elapsed
importing rows 1000000 to 2000000...Done. 51.01263666152954 total seconds elapsed
importing rows 2000000 to 3000000...Done. 75.76265120506287 total seconds elapsed
importing rows 3000000 to 4000000...Done. 100.59846806526184 total seconds elapsed
importing rows 4000000 to 5000000...Done. 126.30175948143005 total seconds elapsed
importing rows 5000000 to 6000000...Done. 152.2184660434723 total seconds elapsed
importing rows 6000000 to 7000000...Done. 176.920241355896 total seconds elapsed
importing rows 7000000 to 8000000...Done. 202.3472557067871 total seconds elapsed
importing rows 8000000 to 9000000...Done. 226.77920413017273 total seconds elapsed
importing rows 9000000 to 10000000...Done. 251.5893738269806 total seconds elapsed
importing rows 10000000 to 11000000...Done. 277.4153528213501 total seconds elapsed
importing rows 11000000 to 12000000...Done. 303.9846248626709 total seconds elapsed
importing rows

TypeError: cannot unpack non-iterable NoneType object

In [None]:
# The code worked and the SQL table for ratings has the correct row count even though there is an error above. 