In [1]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

# pull in postgres password
import sys
sys.path.append("C:/Users/laura/bootcamp/")
from api_keys import db_password
import time

import time

In [2]:
#  Add the clean movie function that takes in the argument, "movie".

def clean_movie(movie):
    movie=dict(movie) #create a non-destructive copy
    alternative_titles={}
    #combine alternative titles into one list
    for i in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune–Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        if i in movie:
            alternative_titles[i]=movie[i]
            movie.pop(i)
    if len(alternative_titles) > 0:
        movie['alternative_titles'] = alternative_titles
    
    #merge column names
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name]=movie.pop(old_name)
           
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')     
        
    return movie   

In [3]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def read_files(wiki_file, kaggle_file, ratings_file):
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    movies_metadata=pd.read_csv(kaggle_file)
    ratings=pd.read_csv(ratings_file)       

    # Open and read the Wikipedia data JSON file.
    with open(wiki_file, mode='r') as file:
        wiki_movies_raw = json.load(file)    
    
    # 3. Write a list comprehension to filter out TV shows.
    wiki_movies=[x for x in wiki_movies_raw
             if ("Director" in x or "Directed by" in x)
                and "imdb_link" in x and "No. of episodes" not in x]

    # 4. Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    wiki_movies_clean=[clean_movie(movie) for movie in wiki_movies]    

    # 5. Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df=pd.DataFrame(wiki_movies_clean)

    # 6. Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        #Since regular expressions use backslashes, which Python also uses for special characters, 
        #we want to tell Python to treat our regular expression characters as a raw string of text. 
        #Therefore, we put an r before the quotes. We need to do this every time we create a regular expression string.
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        #Now we can drop any duplicates of IMDb IDs by using the drop_duplicates() method. 
        #To specify that we only want to consider the IMDb ID, use the subset argument, and 
        #set inplace equal to True so that the operation is performed on the selected dataframe. 
        #Otherwise, the operation would return an edited dataframe that would need to be saved to a new variable. 
        wiki_movies_df.drop_duplicates(subset="imdb_id",inplace=True)
        #print(len(wiki_movies_df))
        #wiki_movies_df.head()    
        
    except: 
    
        print("Did not work - no link")

    #  7. Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]

    # 8. Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    
    # 9. Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office[box_office.map(lambda x: type(x) != str)]
    box_office=box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # 10. Write a regular expression to match the six elements of "form_one" of the box office data.
    #$21.4 million
    form_one=r"\$\s*\d+\.?\d*\s*[mb]illi?on"
    #box_office.str.contains(form_one,flags=re.IGNORECASE,na=False).sum()
    
    # 11. Write a regular expression to match the three elements of "form_two" of the box office data.
    #$123,456,789
    form_two = r"\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)"
    #box_office.str.contains(form_two,flags=re.IGNORECASE,na=False).sum()

    # 12. Add the parse_dollars function.
    
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(r"\$\s*\d+\.?\d*\s*milli?on", s, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)
            # convert to float and multiply by a million
            value = float(s) * 10**6
            # return value
            return value
        # if input is of the form $###.# billion
        elif re.match(r"\$\s*\d+\.?\d*\s*billi?on", s, flags=re.IGNORECASE):
            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)
            # convert to float and multiply by a billion
            value = float(s) * 10**9
            # return value
            return value
        # if input is of the form $###,###,###
        elif re.match(r"\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)",s, flags=re.IGNORECASE):
            # remove dollar sign and commas
            s = re.sub('\$|,','', s)
            # convert to float
            value = float(s)
            # return value
            return value
        # otherwise, return NaN
        else: 
            return np.nan    
        
    # 13. Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    # 14. Clean the budget column in the wiki_movies_df DataFrame.
    budget=wiki_movies_df["Budget"].dropna()
    #convert any lists to strings
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    #remove any values between a dollar sign and a hyphen (for budgets given in ranges):
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE, na=False)
    matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE, na=False)
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    budget[~matches_form_one & ~matches_form_two]
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Budget', axis=1, inplace=True)
    
    # 15. Clean the release date column in the wiki_movies_df DataFrame.
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[0123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    date_form_five = r'\d{2}\s(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four}|{date_form_five})', flags=re.IGNORECASE)
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four}|{date_form_five})')[0], infer_datetime_format=True)
    
    # 16. Clean the running time column in the wiki_movies_df DataFrame.
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    #see how many are in the form xxx minutes
    running_time.str.contains(r'^\d*\s*m', flags=re.IGNORECASE, na=False).sum()
    #will show what is remaining for ones where it doesn't follow the form xxx minutes
    running_time[running_time.str.contains(r'^\d*\s*m', flags=re.IGNORECASE, na=False) != True]
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    #this new DataFrame is all strings, we'll need to convert them to numeric values.
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    #Now we can apply a function that will convert the hour capture groups and minute 
    #capture groups to minutes if the pure minutes capture group is zero, and save the output to
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    #drop runtime now that we have new column running_time which is not a string
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    # Return three variables. The first is the wiki_movies_df DataFrame
    
    
    
    
    # 2. Clean the Kaggle metadata.
    
    #keep rows where the adult column is False, and then drop the adult column.
    movies_metadata = movies_metadata[movies_metadata['adult'] == 'False'].drop('adult',axis='columns')
    #converts data type from a string to a Boolean type - Boolean types are not quoted
    movies_metadata['video']=movies_metadata['video'] == 'True'
    #errors=raise so we know if any data can't be converted to numbers
    movies_metadata['budget']=pd.to_numeric(movies_metadata['budget'],errors='raise')
    #movies_metadata['budget'].astype(int)
    movies_metadata['id']=pd.to_numeric(movies_metadata['id'],errors='raise')
    movies_metadata['popularity']=pd.to_numeric(movies_metadata['popularity'],errors='raise')
    movies_metadata['release_date'] = pd.to_datetime(movies_metadata['release_date'])
    ratings.info(null_counts=True)
    pd.to_datetime(ratings['timestamp'], unit='s')
    ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s')
    
    # 3. Merged the two DataFrames into the movies DataFrame.
    movies_df = pd.merge(wiki_movies_df, movies_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # 4. Drop unnecessary columns from the merged DataFrame.
    #drop title_wiki, release_date_wiki,Language and Production company(s)
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

    # 5. Add in the function to fill in the missing Kaggle data.


    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    #function will fill in missing data for a column pair and drop redundant column
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column]=df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column],axis = 1)
        df.drop(columns=wiki_column, inplace=True)
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
        
    # 7. Filter the movies DataFrame for specific columns.
    for i in movies_df.columns:
        lists_to_tuples = lambda x: tuple(x) if type(x) == list else x
        value_counts = movies_df[i].apply(lists_to_tuples).value_counts(dropna=False)
        num_values = len(value_counts)
        if num_values == 1:
            print(i)

    # 8. Rename the columns in the movies DataFrame.
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]
    
    movies_df.rename({'id':'kaggle_id',
                      'title_kaggle':'title',
                      'url':'wikipedia_url',
                      'budget_kaggle':'budget',
                      'release_date_kaggle':'release_date',
                      'Country':'country',
                      'Distributor':'distributor',
                      'Producer(s)':'producers',
                      'Director':'director',
                      'Starring':'starring',
                      'Cinematography':'cinematography',
                      'Editor(s)':'editors',
                      'Writer(s)':'writers',
                      'Composer(s)':'composers',
                      'Based on':'based_on'
                     }, axis='columns', inplace=True)

    # 9. Transform and merge the ratings DataFrame.
    rating_counts  = ratings.groupby(['movieId','rating'], as_index=False).count().rename({'userID':'count'},axis=1)
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    #because not every movie got a rating for each rating level, there will be missing values instead of zeros. 
    #We have to fill those in ourselves, like this:
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

    
    #The database engine needs to know how to connect to the database. To do that, we make a connection string. 
    #For PostgreSQL, the connection string will look like the following:

    db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"
    engine = create_engine(db_string)
    movies_df.to_sql(name='movies3', con=engine)
    # create a variable for the number of rows imported
    rows_imported=0
    start_time=time.time()
    for data in pd.read_csv(f'{file_dir}/ratings.csv', chunksize=1000000):

        # print out the range of rows that are being imported

        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        data.to_sql(name='ratings1', con=engine, if_exists='append')

        # increment the number of rows imported by the chunksize

        rows_imported += len(data)

        # print that the rows have finished importing
        print(f'Done. {time.time() - start_time} total seconds elapsed')
    
    
#For example, foo += 1 is equivalent to foo = foo + 1.
    #return wiki_movies_df, movies_with_ratings_df, movies_df


In [4]:
# 10. Create the path to your file directory and variables for the three files.
file_dir = 'C://Users/Laura/bootcamp/Movies_ETL'
#file_dir = 'C://Users/Laura/bootcamp/Movies_ETL/Resources' use after files put in resource folder
# Wikipedia data
wiki_file = f'{file_dir}/wikipedia-movies.json'
# Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [5]:
# 11. Set the three variables equal to the function created in D1.
read_files(wiki_file, kaggle_file, ratings_file)

  


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 26024289 entries, 0 to 26024288
Data columns (total 4 columns):
 #   Column     Non-Null Count     Dtype  
---  ------     --------------     -----  
 0   userId     26024289 non-null  int64  
 1   movieId    26024289 non-null  int64  
 2   rating     26024289 non-null  float64
 3   timestamp  26024289 non-null  int64  
dtypes: float64(1), int64(3)
memory usage: 794.2 MB
video
importing rows 0 to 1000000...Done. 22.8388934135437 total seconds elapsed
importing rows 1000000 to 2000000...Done. 45.291752338409424 total seconds elapsed
importing rows 2000000 to 3000000...Done. 67.7077226638794 total seconds elapsed
importing rows 3000000 to 4000000...Done. 89.89376473426819 total seconds elapsed
importing rows 4000000 to 5000000...Done. 112.49218392372131 total seconds elapsed
importing rows 5000000 to 6000000...Done. 135.19526147842407 total seconds elapsed
importing rows 6000000 to 7000000...Done. 157.54089188575745 total seconds elapsed
