In [2]:
#ETL module, where we take in dirty json formated data, clean it, and put it into dataframe 

#setting up dependencies
import json
import pandas as pd
import numpy as np
import re
from sqlalchemy import create_engine
from config import db_password
import time

In [3]:
# syntax for connect "postgres://[user]:[password]@[location]:[port]/[database]"
db_string = f"postgres://postgres:{db_password}@127.0.0.1:5433/movie_data"

#set up sqlalchemy engine 
engine = create_engine(db_string)

In [4]:
#setting the file path and directory
file_dir = 'C:/Users/tienl/Desktop/school/Modules/Movies-ETL/'

In [5]:
#checking file name in a string with path
f'{file_dir}filename'

'C:/Users/tienl/Desktop/school/Modules/Movies-ETL/filename'

In [6]:
#opening the json that we want, and loading it into a list of dictionaries 
with open(f'{file_dir}wikipedia.movies.json', mode='r') as file:
    wiki_movies_raw = json.load(file)

#checking the length of it
len(wiki_movies_raw)


7311

In [7]:
# First 5 [:5], last [-5:], and middle[3600:3605]
wiki_movies_raw[:5]

[{'url': 'https://en.wikipedia.org/wiki/The_Adventures_of_Ford_Fairlane',
  'year': 1990,
  'imdb_link': 'https://www.imdb.com/title/tt0098987/',
  'title': 'The Adventures of Ford Fairlane',
  'Directed by': 'Renny Harlin',
  'Produced by': ['Steve Perry', 'Joel Silver'],
  'Screenplay by': ['David Arnott', 'James Cappe', 'Daniel Waters'],
  'Story by': ['David Arnott', 'James Cappe'],
  'Based on': ['Characters', 'by Rex Weiner'],
  'Starring': ['Andrew Dice Clay',
   'Wayne Newton',
   'Priscilla Presley',
   'Lauren Holly',
   'Morris Day',
   'Robert Englund',
   "Ed O'Neill"],
  'Narrated by': 'Andrew "Dice" Clay',
  'Music by': ['Cliff Eidelman', 'Yello'],
  'Cinematography': 'Oliver Wood',
  'Edited by': 'Michael Tronick',
  'Productioncompany ': 'Silver Pictures',
  'Distributed by': '20th Century Fox',
  'Release date': ['July 11, 1990', '(', '1990-07-11', ')'],
  'Running time': '102 minutes',
  'Country': 'United States',
  'Language': 'English',
  'Budget': '$20 million',


In [8]:
#pull flat csvs into dataframe
kaggle_metadata = pd.read_csv(f'{file_dir}movies_metadata.csv')
ratings = pd.read_csv(f'{file_dir}ratings.csv')

  interactivity=interactivity, compiler=compiler, result=result)


In [12]:
#validate data with .head() .tail() and sample(n=5)
#kaggle_metadata.sample(n=10)

In [13]:
#reading variable into pandas df 
wiki_movies_df = pd.DataFrame(wiki_movies_raw)

In [21]:
wiki_movies_df.head(1)

2

In [None]:
#if we put the columns to a list, this is what it would look like. 
wiki_movies_df.columns.tolist()
len(wiki_movies_df.columns.tolist())

In [None]:
wiki_movies_df.columns
wiki_movies_df['Venue']

In [1]:
wiki_movies_df

NameError: name 'wiki_movies_df' is not defined

In [None]:
#using list comprehension, putting all movies with directors and links in to a variable, and checking on the amount of the result. 
wiki_movies = [movie for movie in wiki_movies_raw
               if ('Director' in movie or 'Directed by' in movie)
                   and 'imdb_link' in movie
                   and 'No. of episodes' not in movie]

wiki_movies

In [None]:
#putting our filtered result into a pandas dataframe and checking on the length of columns. 
wiki_movies_df = pd.DataFrame(wiki_movies)


In [None]:


#create a function to clean movie using an internal variable
def clean_movie(movie):
    movie = dict(movie) #create a non-destructive copy
    alt_titles = {}
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune–Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles
        
    #function to scrub or merge similiar column names, repeated otherwise
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')
    
    return movie



clean_movies = [clean_movie(movie) for movie in wiki_movies]
clean_movies


In [None]:
#putting cleaned movies into a df
wiki_movies_df = pd.DataFrame(clean_movies)
sorted(wiki_movies_df.columns.tolist())

In [None]:
#find movies that have a value in column Arabic 
#wiki_movies_df[wiki_movies_df['Arabic'].notnull()]

In [None]:
#return the URL from our query 
#wiki_movies_df[wiki_movies_df['Arabic'].notnull()]['url']

In [None]:
#sorted columns to investigate 
sorted(wiki_movies_df.columns.tolist())

In [None]:
#extract IMDB ID with regex 
wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
print(len(wiki_movies_df))

#remove duplicate rows 
wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
print(len(wiki_movies_df))
wiki_movies_df.head()

In [None]:
#show how nulls each column has
[[column,wiki_movies_df[column].isnull().sum()] for column in wiki_movies_df.columns]

In [None]:
#show columns that have less than 90% of null value
[column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]

In [None]:
#putting the above logic as qualifier into the dataframe to keep
wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]

In [None]:
#getting ready to convert data types for sql 
wiki_movies_df.dtypes

In [None]:
#start with box office, drop all blanks in box office
box_office = wiki_movies_df['Box office'].dropna() 
box_office

In [None]:
#find all box office that are not strings
def is_not_a_string(x):
    return type(x) != str
box_office[box_office.map(is_not_a_string)]

#find all box office that are not strings using lambda
box_office[box_office.map(lambda x: type(x) != str)]

In [None]:
#clean the box office column when there's a list of data, use the lambda to join them
box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

In [None]:
#get rid of range in box office value
box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
box_office.sample(n=10)

In [None]:
#regex to identify string that matches $123.4 mbillion
form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'

In [None]:
#check for box office that matches our regex form_one
box_office.str.contains(form_one, flags=re.IGNORECASE).sum()

In [None]:
#regex to match $123,456,789 style 
form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'

In [None]:
#check for box office that matches form two
box_office.str.contains(form_two, flags=re.IGNORECASE).sum()

In [None]:
#setting the matches 
matches_form_one = box_office.str.contains(form_one, flags=re.IGNORECASE)
matches_form_two = box_office.str.contains(form_two, flags=re.IGNORECASE)

In [None]:
#checking data frame for negatives
box_office[~matches_form_one & ~matches_form_two]

In [None]:
box_office.str.extract(f'({form_one}|{form_two})')

In [None]:
#function to convert various forms of number in millions and billion from string into values

def parse_dollars(s):
    # if s is not a string, return NaN
    if type(s) != str:
        return np.nan

    # if input is of the form $###.# million
    if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

        # remove dollar sign and " million"
        s = re.sub('\$|\s|[a-zA-Z]','', s)

        # convert to float and multiply by a million
        value = float(s) * 10**6

        # return value
        return value

    # if input is of the form $###.# billion
    elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

        # remove dollar sign and " billion"
        s = re.sub('\$|\s|[a-zA-Z]','', s)

        # convert to float and multiply by a billion
        value = float(s) * 10**9

        # return value
        return value

    # if input is of the form $###,###,###
    elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

        # remove dollar sign and commas
        s = re.sub('\$|,','', s)

        # convert to float
        value = float(s)

        # return value
        return value

    # otherwise, return NaN
    else:
        return np.nan    

In [None]:
#extract the string then parse it into integer value for the data frame
wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
wiki_movies_df['box_office']

In [None]:
#no longer need box office string column so we are dropping it
wiki_movies_df.drop('Box office', axis=1, inplace=True)

In [None]:
#create a budget variable
budget = wiki_movies_df['Budget'].dropna()

In [None]:
#conver any list in budget in to string
budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
budget

In [None]:
#remove any hypen style budget into normal budget
budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)

In [None]:
#running the same cleaning against budget
matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE)
matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE)

#remove citation references [#]
budget = budget.str.replace(r'\[\d+\]\s*', '')
budget[~matches_form_one & ~matches_form_two]

In [None]:
#getting budget parsed 
wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

#dropping old budget column
wiki_movies_df.drop('Budget', axis=1, inplace=True)

In [None]:
#making variable to parse release date, non null and convert list to strings
release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

In [None]:
#date forms
date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}'
date_form_two = r'\d{4}.[01]\d.[123]\d'
date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
date_form_four = r'\d{4}'

In [None]:
#extract dates 
release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)

In [None]:
#using pandas to convert date and time
wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)

In [None]:
wiki_movies_df['release_date']

In [None]:
#parse running time 
running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

In [None]:
#see how many look like xxx m (inutes)
running_time.str.contains(r'\d*\s*m', flags=re.IGNORECASE).sum()

In [None]:
#see the rest 
running_time[running_time.str.contains(r'\d*\s*m', flags=re.IGNORECASE) != True]

In [None]:
#extract the running time into 3 groups, hour, minute, and pure minutes
running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
running_time_extract

In [None]:
#apply lambda to transform nan to numeric
running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
running_time_extract

In [None]:
#convert everything to pure minutes and putting it in dataframe
wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
wiki_movies_df['running_time'] 

In [None]:
#drop old running time column
wiki_movies_df.drop('Running time', axis=1, inplace=True)

In [None]:
##################################################
#############################################################
############ kaggle data starts from here


kaggle_metadata.dtypes

In [None]:
#count the values of adult column result
kaggle_metadata['adult'].value_counts()

In [None]:
#showing the 3 that aren't true or false
kaggle_metadata[~kaggle_metadata['adult'].isin(['True','False'])]


In [None]:
#just drop adult column and all that are not adult == false
kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')

In [None]:
kaggle_metadata['video']

In [None]:
#setting the column into boolean by comparing it 
kaggle_metadata['video'] == 'True'

In [None]:
#putting the result of the comparison into the column
kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'

In [None]:
#convert numeric columns 
kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')



In [None]:
#conver release date to date and time format
kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])

In [None]:
#get information on ratings df
ratings.info(null_counts=True)

In [None]:
#testing the timestamp format
pd.to_datetime(ratings['timestamp'], unit='s')

In [None]:
#setting the timestamps in to the df column
ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s')

In [None]:
#checkign the ratings data via histogram and describe to see layout
ratings['rating'].plot(kind='hist')
ratings['rating'].describe()

In [None]:
#adding suffix while merging, same columns will be distinguished by suffix 
movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])


In [None]:
#examine the columns for duplicates with the merge
movies_df.columns

In [None]:
# Competing data:
# Wiki                     Movielens                Resolution
#--------------------------------------------------------------------------
# title_wiki               title_kaggle             drop wiki
# running_time             runtime                  Keep Kaggle; fill in zeros with Wikipedia data.
# budget_wiki              budget_kaggle            Keep Kaggle; fill in zeros with Wikipedia data.
# box_office               revenue                  Keep Kaggle; fill in zeros with Wikipedia data.
# release_date_wiki        release_date_kaggle      Drop Wikipedia.
# Language                 original_language        Drop Wikipedia.
# Production company(s)    production_companies     Drop Wikipedia.

In [None]:
#looking at the titles
movies_df[['title_wiki','title_kaggle']]

In [None]:
#looking for rows that don't match - df[condtion][[extract]]
movies_df[movies_df['title_wiki'] != movies_df['title_kaggle']][['title_wiki','title_kaggle']]

In [None]:
#check to see if there's any missing titles in kaggle
movies_df[(movies_df['title_kaggle'] == '') | (movies_df['title_kaggle'].isnull())]

In [None]:
#using scatter plot to compare wiki from kaggle on runtime 
#fill null with 0 to get the full picture
movies_df.fillna(0).plot(x='running_time', y='runtime', kind='scatter')

In [None]:
#use plot again for budget 
movies_df.fillna(0).plot(x='budget_wiki',y='budget_kaggle', kind='scatter')

In [None]:
#use plot for box office 
movies_df.fillna(0).plot(x='box_office', y='revenue', kind='scatter')

In [None]:
#box office again for less than 1 billion, since previous was too large 
movies_df.fillna(0)[movies_df['box_office'] < 10**9].plot(x='box_office', y='revenue', kind='scatter')

In [None]:
#plot for release date, using line plot for dates but with dots as a workaround
movies_df[['release_date_wiki','release_date_kaggle']].plot(x='release_date_wiki', y='release_date_kaggle', style='.')

In [None]:
#looking at the outlier with condition shown by the graph
movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')]
#myint = movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')]
#myint[['release_date_wiki', 'release_date_kaggle']]

In [None]:
#identify the index 
movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index


In [None]:
#dropping the outlier since its corrupt
movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index)

In [None]:
#showing the index row of 3631
movies_df.loc[[3631]]

In [None]:
#an alternative way of dropping by index 
movies_df.drop(movies_df.index[[3631]])

In [None]:

#pd.set_option('display.max_rows', 500)
print(movies_df[movies_df['release_date_wiki'].isnull()][['release_date_kaggle']])

In [None]:
#count number of languages
movies_df['Language'].value_counts(dropna=False)

In [None]:
#convert list to tuple if necessary
movies_df['Language'].apply(lambda x: tuple(x) if type(x) == list else x).value_counts(dropna=False)

In [None]:
#kaggle data on languages

movies_df['original_language'].value_counts(dropna=False)

In [None]:
#looking at the production companys columns to get a sense of the data
#movies_df.columns
movies_df[['Production company(s)','production_companies']].sample(n=5)

In [None]:
#drop the columns that we don't want after decision is made
movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

In [None]:
#function to fill missing data then drop redundant column
def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
    df[kaggle_column] = df.apply(
        lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
        , axis=1)
    df.drop(columns=wiki_column, inplace=True)

In [None]:
#run function for the columns filling from wiki to kaggle
fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
movies_df

In [None]:
#identify any column with only 1 value, if they are identical (1 value) then the data is not useful
for col in movies_df.columns:
    lists_to_tuples = lambda x: tuple(x) if type(x) == list else x
    value_counts = movies_df[col].apply(lists_to_tuples).value_counts(dropna=False)
    num_values = len(value_counts)
    if num_values == 1:
        print(col)
        


In [None]:
# [dog for dog in dogs if dog['weight'] > 30]

# Here,

#     the first dog is the expression
#     the second dog is the element
#     dogs is the source_list
#     dog['weight'] > 30 is the filter_expression
# even_numbers = [x for x in range(100) if x % 2 == 0]
# even_numbers
# Note that we can use the element in our filter expression
#lc = [col for col in movies_df.columns: if movies_df[col].value_counts(dropna=False) == 1]
#lc

In [None]:
#movies_df(movies_df[col].value_counts(dropna=False) == 0)
#movies_df[col]


In [None]:
#checking on the column's value against total rows 
movies_df['video'].value_counts(dropna=False)
movies_df.columns



In [None]:
#dropping video columns from df
movies_df = movies_df.drop(['video'],axis=1)
movies_df.columns

In [None]:
#reordering the columns for ease of consumption
column_names = ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on']
movies_df = movies_df.reindex(columns=column_names)
movies_df


In [None]:
movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

In [None]:
movies_df

In [None]:
ratings_counts = []
#transform and merging rating data
#count how many times a movie received a given rating. This way, someone who wants to calculate statistics for the dataset would have all the information they need. 
rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) 

In [None]:
rating_counts

In [None]:
# pivot this data so that movieId is the index, the columns will be all the rating values, and the rows will be the counts for each rating value.
ratings_coutns = []
rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')

In [None]:
#We want to rename the columns so they’re easier to understand. We’ll prepend rating_ to each column with a list comprehension:
rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]

In [None]:
rating_counts

In [None]:
#merge rating info into movie_df 
movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')

In [None]:
#fill in NaN with 0s 
movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

In [None]:
movies_with_ratings_df

In [None]:
#import df to sql 
movies_df.to_sql(name='movies', con=engine)

In [None]:
#import ratings data by breaking up chunks and monitoring the progress 
#step 1 print progress as we make progress
# rows_imported = 0
# for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):

#     print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
#     data.to_sql(name='ratings', con=engine, if_exists='append')
#     rows_imported += len(data)

#     print(f'Done.')
    
#step 2  add elapsed time of progression
rows_imported = 0
# get the start_time from time.time()
start_time = time.time()
for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):
    print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
    data.to_sql(name='ratings', con=engine, if_exists='append')
    rows_imported += len(data)

    # add elapsed time to final print out
    print(f'Done. {time.time() - start_time} total seconds elapsed')