In [82]:
# Import Dependencies
import json
import pandas as pd
import numpy as np
import re
from sqlalchemy import create_engine
import psycopg2
from config import postgres_pwd
import time

In [83]:
# Clean Movie
alt_langs = ['Also known as','Arabic','Cantonese','Chinese','French','Hangul','Hebrew','Hepburn','Japanese',
             'Literally','Mandarin','McCune–Reischauer','Original title','Polish','Revised Romanization','Romanized',
             'Russian','Simplified','Traditional','Yiddish']
def clean_movie(x):
    x = dict(x)
    
    # Consolidate Titles from all Languages
    alt_titles = {}
    for i in alt_langs:
        if i in x:
            alt_titles[i] = x[i]
            x.pop(i)
    if alt_titles:
        x['Alternate Title'] = alt_titles
    
    # Consolidate similar Columns
    def consolidate_columns(a,b):
        if a in x:
            x[b] = x.pop(a)
    consolidate_columns('Adaptation by', 'Writer(s)')
    consolidate_columns('Written by', 'Writer(s)')
    consolidate_columns('Screen story by', 'Writer(s)')
    consolidate_columns('Screenplay by', 'Writer(s)')
    consolidate_columns('Story by', 'Writer(s)')
    consolidate_columns('Country of origin', 'Country')
    consolidate_columns('Directed by', 'Director')
    consolidate_columns('Distributed by', 'Distributor')
    consolidate_columns('Edited by', 'Editor(s)')
    consolidate_columns('Length', 'Running time')
    consolidate_columns('Music by', 'Composer(s)')
    consolidate_columns('Theme music composer', 'Composer(s)')
    consolidate_columns('Produced by', 'Producer(s)')
    consolidate_columns('Producer', 'Producer(s)')
    consolidate_columns('Productioncompanies ', 'Production company(s)')
    consolidate_columns('Productioncompany ', 'Production company(s)')
    consolidate_columns('Original release', 'Release date')
    consolidate_columns('Released', 'Release date')
    consolidate_columns('Release Date', 'Release date')
    
    return x

In [84]:
# Extract, Transform and Load
def extract_transform_load(wiki,kaggle,ratings):

    # Extract Wiki Data
    with open(wiki_file,'r') as wiki_json:
        wiki_list = json.load(wiki_json)
        wiki_list = [i for i in wiki_list 
                     if ('Director' in i or 'Directed by' in i)
                       and 'imdb_link' in i
                       and 'No. of episodes' not in i
                    ]
        wiki_clean_list = [clean_movie(i) for i in wiki_list]
        wiki_df = pd.DataFrame(wiki_clean_list)

    # Transform Wiki Data - Extract IMDB ID    
    try:       
        wiki_df['imdb_id'] = wiki_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_df.drop_duplicates(subset='imdb_id',inplace=True)
    except:
        print('Error')

    # Transform Wiki Data - Drop Columns with Null values in 90% of rows
    wiki_columns_to_keep = [i for i in wiki_df if wiki_df[i].count() >= (len(wiki_df)*0.1)]
    wiki_df = wiki_df[wiki_columns_to_keep]

    # Transform Wiki Data - Clean Box Office
    box_off = wiki_df['Box office'].dropna() 
    box_off = box_off.apply(lambda x: ' '.join(x) if type(x) == list else x)
    box_off = box_off.str.replace(r'\$.*[-—–](?![a-z])','$', regex=True)
    format_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
    format_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illi?on)'
    def parse_dollars(x):
        if type(x) != str:
            return np.nan
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on',x,flags=re.IGNORECASE):
            x = re.sub('\$|[a-zA-Z]|\s','',x)
            x = float(x) * 10**6
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on',x,flags=re.IGNORECASE):
            x = re.sub('\$|[a-zA-Z]|\s','',x)
            x = float(x) * 10**9
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illi?on)',x,flags=re.IGNORECASE):
            x = re.sub('\$|,','',x)
            x = float(x)
        else:
            return np.nan
        return x     
    wiki_df['box_office'] = box_off.str.extract(f'({format_one}|{format_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_df.drop('Box office', axis=1, inplace=True)

    # Transform Wiki Data - Clean Budget
    bud = wiki_df['Budget'].dropna()
    bud = bud.apply(lambda x: ' '.join(x) if type(x) == list else x)
    bud = bud.str.replace(r'\[\d+\]\s*', '')
    bud = bud.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    format_one = r'\$\s*\d+\.?\d*\s*mill?i?on'
    format_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\smillion)'
    wiki_df['budget'] = bud.str.extract(f'({format_one}|{format_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_df.drop('Budget', axis=1, inplace=True)

    # Transform Wiki Data - Clean Release Date
    rel_dt = wiki_df['Release date'].dropna()
    rel_dt = rel_dt.apply(lambda x: ' '.join(x) if type(x) == list else x)
    month = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)'
    format_one = month + r'\s\d{1,2}\,\s\d{4}'
    format_two = r'(?:\d{1,2}\s)?' + month + r' \d{4}'
    format_three = '(' + month + ',\s\d{4})'
    format_four = r'\d{4}.[01]\d.[0123]\d'
    format_five = r'(^\d{4})'
    wiki_df['release_date'] = pd.to_datetime(rel_dt.str.extract(f'({format_one}|{format_two}|{format_three}|{format_four}|{format_five})', flags=re.IGNORECASE)[0]) 
    wiki_df.drop('Release date', axis=1, inplace=True)

    # Transform Wiki Data - Clean Run Time
    run = wiki_df['Running time'].dropna()
    run = run.apply(lambda x: ' '.join(x) if type(x) == list else x)
    format_one = r'(\d+)\s*ho?u?r?s?\s*(\d*)'
    format_two = r'^(\d+)\s*m?'
    run_time_extract = run.str.extract(f'({format_one}|{format_two})', flags=re.IGNORECASE)
    run_time_extract = run_time_extract.apply(lambda x: pd.to_numeric(x,errors='coerce')).fillna(0)
    wiki_df['running_time'] = run_time_extract.apply(lambda x: x[1]*60+x[2] if x[2]!=0 else x[3], axis=1)
    wiki_df.drop('Running time', axis=1, inplace=True)

    # Extract & Transform Kaggle Data
    kaggle_df = pd.read_csv(kaggle_file)
    kaggle_df = kaggle_df.loc[kaggle_df['adult'] == 'False'].drop(['adult'],axis=1)
    kaggle_df['video'] = kaggle_df['video'] == True
    kaggle_df['budget'] = pd.to_numeric(kaggle_df['budget'], errors='raise')
    kaggle_df['id'] = pd.to_numeric(kaggle_df['id'], errors='raise')
    kaggle_df['popularity'] = pd.to_numeric(kaggle_df['popularity'], errors='raise')
    kaggle_df['release_date'] = pd.to_datetime(kaggle_df['release_date'])

    # Merge Wiki and Kaggle into Movies DataFrame
    movies_df = pd.merge(wiki_df, kaggle_df, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # Clean Movies DF
    movies_df = movies_df[movies_df['id']!=11426] # Deleting this record because of title mismatch between Wiki/Kaggle
    def fill_missing_data(df,source,destination):
        df[destination] = df.apply(lambda x: x[source] if x[destination]==0 else x[destination],axis=1)
    fill_missing_data(movies_df,'running_time','runtime')
    fill_missing_data(movies_df,'budget_wiki','budget_kaggle')
    fill_missing_data(movies_df,'box_office','revenue')
    movies_df = movies_df.loc[:, [
        'imdb_id',
        'id',
        'title_kaggle',
        'original_title',
        'tagline',
        'belongs_to_collection',
        'url',
        'imdb_link',
        'runtime',
        'budget_kaggle',
        'revenue',
        'release_date_kaggle',
        'popularity',
        'vote_average',
        'vote_count',
        'genres',
        'original_language',
        'overview',
        'spoken_languages',
        'Country',
        'production_companies',
        'production_countries',
        'Distributor',
        'Producer(s)',
        'Director',
        'Starring',
        'Cinematography',
        'Editor(s)',
        'Writer(s)',
        'Composer(s)',
        'Based on'
        ]]
    movies_df.rename({'id':'kaggle_id',
                      'title_kaggle':'title',
                      'url':'wikipedia_url',
                      'budget_kaggle':'budget',
                      'release_date_kaggle':'release_date',
                      'Country':'country',
                      'Distributor':'distributor',
                      'Producer(s)':'producers',
                      'Director':'director',
                      'Starring':'starring',
                      'Cinematography':'cinematography',
                      'Editor(s)':'editors',
                      'Writer(s)':'writers',
                      'Composer(s)':'composers',
                      'Based on':'based_on'
                     },
                     axis='columns',
                     inplace=True)

    # Extract & Transform Ratings Data
    ratings_df = pd.read_csv(ratings_file)
    rating_counts = ratings_df.groupby(['movieId','rating'], as_index=False).count() \
                    .rename({'userId':'count'}, axis=1) \
                    .pivot(index='movieId',columns='rating', values='count')    
    rating_counts.columns = ['rating_' + str(i) for i in rating_counts.columns]
    
    # Merge Movies and Ratings Data
    movies_with_ratings_df = pd.merge(
                                    movies_df,
                                    rating_counts,
                                    how='left',
                                    left_on='kaggle_id',
                                    right_index=True
                                )
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)
    
    # Establish Database Connection
    db_string = f"postgres://postgres:{postgres_pwd}@localhost:5432/movie_data"
    engine = create_engine(db_string)
    
    # Create SQL Tables
    movies_df.to_sql(name='movies', con=engine, if_exists='replace')
    rows_imported = 0
    start_time = time.time()
    for data in pd.read_csv(ratings_file, chunksize=1000000):
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        data.to_sql(name='ratings', con=engine, if_exists='append')
        rows_imported += len(data)
        print(f'Done. {time.time() - start_time} total seconds elapsed')

In [85]:
# Assign file paths
file_dir = '../Data/'
wiki_file = f'{file_dir}wikipedia-movies.json'
kaggle_file = f'{file_dir}movies_metadata.csv'
ratings_file = f'{file_dir}ratings.csv'

In [86]:
# Run ETL Function
extract_transform_load(wiki_file,kaggle_file,ratings_file)

  if (await self.run_code(code, result,  async_=asy)):


importing rows 0 to 1000000...Done. 437.17898201942444 total seconds elapsed
importing rows 1000000 to 2000000...Done. 770.2851700782776 total seconds elapsed
importing rows 2000000 to 3000000...Done. 1123.565827846527 total seconds elapsed
importing rows 3000000 to 4000000...Done. 1487.373540878296 total seconds elapsed
importing rows 4000000 to 5000000...Done. 1893.2138838768005 total seconds elapsed
importing rows 5000000 to 6000000...Done. 2273.2231121063232 total seconds elapsed
importing rows 6000000 to 7000000...Done. 2639.6773982048035 total seconds elapsed
importing rows 7000000 to 8000000...Done. 3085.395299911499 total seconds elapsed
importing rows 8000000 to 9000000...Done. 3518.668941259384 total seconds elapsed
importing rows 9000000 to 10000000...Done. 3943.8692231178284 total seconds elapsed
importing rows 10000000 to 11000000...Done. 4261.564292907715 total seconds elapsed
importing rows 11000000 to 12000000...Done. 4641.88893198967 total seconds elapsed
importing row