In [1]:
import json
import pandas as pd
import numpy as np
import os
import re
from sqlalchemy import create_engine
from config import db_password
import time
import sys

In [2]:
file_dir = "C:/Users/tomas/Documents/DABEx/Modules/Module_8/Movies-ETL/Movies-ETL/Resources"

In [3]:
with open (f'{file_dir}/wikipedia.movies.json', mode= 'r') as file:
    wiki_movies_raw = json.load(file)

In [4]:
kaggle_metadata_file_path = os.path.join("Resources","movies_metadata.csv")
ratings_file_path = os.path.join("Resources","ratings.csv")

In [5]:
kaggle_metadata = pd.read_csv(kaggle_metadata_file_path, low_memory = False)
ratings = pd.read_csv(ratings_file_path, low_memory= False)
wiki_movies = pd.DataFrame(wiki_movies_raw)

In [6]:
def mega_function(kaggle_metadata, wiki_movies,ratings):

    def clean_movie(movie):
        movie = dict(movie) #create a non-destructive copy
        alt_titles = {}
        # combine alternate titles into one list
        for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune-Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
            if key in movie:
                alt_titles[key] = movie[key]
                movie.pop(key)
        if len(alt_titles) > 0:
            movie['alt_titles'] = alt_titles

    # merge column names
        def change_column_name(old_name, new_name):
            if old_name in movie:
                movie[new_name] = movie.pop(old_name)
        
        change_column_name('Adaptation by', 'Writer(s)')
        change_column_name('Country of origin', 'Country')
        change_column_name('Directed by', 'Director')
        change_column_name('Distributed by', 'Distributor')
        change_column_name('Edited by', 'Editor(s)')
        change_column_name('Length', 'Running time')
        change_column_name('Original release', 'Release date')
        change_column_name('Music by', 'Composer(s)')
        change_column_name('Produced by', 'Producer(s)')
        change_column_name('Producer', 'Producer(s)')
        change_column_name('Productioncompanies ', 'Production company(s)')
        change_column_name('Productioncompany ', 'Production company(s)')
        change_column_name('Released', 'Release Date')
        change_column_name('Release Date', 'Release date')
        change_column_name('Screen story by', 'Writer(s)')
        change_column_name('Screenplay by', 'Writer(s)')
        change_column_name('Story by', 'Writer(s)')
        change_column_name('Theme music composer', 'Composer(s)')
        change_column_name('Written by', 'Writer(s)')

        return movie


    wiki_movies_dict = [movie for movie in wiki_movies_raw if ('Director' or 'Directed by' in movie) and 'imdb_link' in movie]

    clean_movies = [clean_movie(movie) for movie in wiki_movies_dict]

    wiki_movies = pd.DataFrame(clean_movies)

#Cleaning Wiki_movies function

    def clean_wiki(wiki_movies):
    
        wiki_movies["imdb_id"] = wiki_movies["imdb_link"].str.extract(r'(tt\d{7})')
    
        wiki_movies.drop_duplicates(subset ='imdb_id', inplace = True)
    
        def parse_dollars(s):
    
            if type(s) != str:
                return np.nan

            if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):
                s = re.sub('\$|\s|[a-zA-Z]','', s)

        # convert to float and multiply by a million
                value = float(s) * 10**6
                return value

    # if input is of the form $###.# billion
            elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):
                s = re.sub('\$|\s|[a-zA-Z]','', s)

        # convert to float and multiply by a billion
                value = float(s) * 10**9
                return value
    # if input is of the form $###,###,###
            elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):
        # remove dollar sign and commas
                s = re.sub('\$|,','', s)
        # convert to float
                value = float(s)
        # return value
                return value
    # otherwise, return NaN
            else:
                return np.nan
 
        form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
        form_two =  r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'
        bad_form_one = r'\$.*[-—–](?![a-z])'
        bad_form_two = r'\[\d+\]\s*'
    
        date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[1-3]\d,\s\d{4}'
        date_form_two = r'\d{4}.[01]\d.[123]\d'
        date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
        date_form_four = r'\d{4}'
    
        time_form_one = r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m'

    #cleaning imdb
        try:
            wiki_movies["imdb_id"] = wiki_movies["imdb_link"].str.extract(r'(tt\d{7})')
            wiki_movies.drop_duplicates(subset ='imdb_id', inplace = True)
        except (AttributeError,KeyError):
            print("Box_office_Unexpected error:", sys.exc_info()[0])
            pass        
    
    #cleanig box office data
        try:
            box_office = wiki_movies['Box office'].dropna()
            box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)
            box_office = box_office.str.replace(f'({bad_form_one})', '$', regex=True) 
            wiki_movies['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
            wiki_movies.drop('Box office', axis=1, inplace=True)
        except (AttributeError,KeyError):
            print("Box_office_Unexpected error:", sys.exc_info()[0])
            pass
    
    #cleaning budget data
        try:
            budget = wiki_movies['Budget'].dropna()
            budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
            budget = budget.str.replace(f'({bad_form_one})', '$', regex=True)
            budget = budget.str.replace(f'({bad_form_two})', '')
            wiki_movies['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
            wiki_movies.drop('Budget', axis=1, inplace=True)
        except (AttributeError,KeyError):
            print("Budget_Unexpected error:", sys.exc_info()[0])
            pass
        
    #cleaing release date data    
        try:
            release_date = wiki_movies['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
            wiki_movies['release_date']=release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)
            wiki_movies.drop('Release date', axis=1, inplace=True)
        except (AttributeError,KeyError):
            print("Release_date_Unexpected error:", sys.exc_info()[0])
            pass
    
    #cleaning runing time data
        try:
            running_time = wiki_movies['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
            running_time_extract = running_time.str.extract(f'({time_form_one})')
            running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
            wiki_movies['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
            wiki_movies.drop('Running time', axis=1, inplace=True)
        except (AttributeError,KeyError):
            print("Running_time_Unexpected error:", sys.exc_info()[0])
            pass

  
    #Checkpoint
        print('OKa')
    
        
        return wiki_movies

    wiki_movies = clean_wiki(wiki_movies)

#Cleaning kaggle_df function

    def cleaning_kaggle(kaggle_metadata):
    
        try:
            kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
            kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
            kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
            kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
            kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
            kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])
        except (AttributeError,KeyError):
            print("Box_office_Unexpected error:", sys.exc_info()[0])
            pass
        return kaggle_metadata

    
    kaggle_metadata=cleaning_kaggle(kaggle_metadata)

    movies_df = pd.merge(wiki_movies, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])
    

#Cleaning movies_df function
   
    def movies_clean(movies_df):
        try:
            def fill_missing_kaggle_data(movies_df, kaggle_column, wiki_column):
                movies_df[kaggle_column] = movies_df.apply(lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column], axis=1)
                movies_df.drop(columns=wiki_column, inplace=True)
        
                return movies_df
    
        
            movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index)
            movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)
        
            fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
            fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
            fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
    
            movies_df = movies_df[['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country','production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on']]
            movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)
        except (AttributeError,KeyError):
            print("Movies clean error:", sys.exc_info()[0])
        pass
    
        return movies_df
    
    print("okb")
#Preparing to merge data framess
    try:
        movies_df = movies_clean(movies_df)
    
        rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count().rename({'userId':'count'}, axis=1).pivot(index='movieId',columns='rating', values='count')
        rating_counts.columns = ['rating_'+str(col) for col in rating_counts.columns]
        movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
        movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)
    except (AttributeError,KeyError):
        print("Movies clean error:", sys.exc_info()[0])
    pass
#check point
    print('OKc')
    
    
    
    #Updating the data table by removing records and inserting new (movie_df only per HW request)
    db_string = f"postgres://postgres:{db_password}@127.0.0.1:5432/movie_data"
    engine = create_engine(db_string)
    
    #deleting all data from the table "movies"
    try:
        query = "DELETE FROM movies"
        connection = engine.connect()
        connection.execute(query)
        connection.close()
    except (ValueError):
        print("Table already loaded")
    pass
    
    rows_imported = 0
    try:
        movies_df.to_csv(f'{file_dir}/new_movies.csv', index = False)
        # get the start_time from time.time()
        start_time = time.time()
    
        for data in pd.read_csv(f'{file_dir}/new_movies.csv', chunksize=100):
            print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
            data.to_sql(name='movies', con=engine, if_exists='append')
            rows_imported += len(data)

        # add elapsed time to final print out
            print(f'Done. {time.time() - start_time} total seconds elapsed')

    except (ValueError):
        print("Table already loaded")
    pass

       
    
    return movies_with_ratings_df

In [7]:
my_data = mega_function(kaggle_metadata, wiki_movies,ratings)

OKa
okb
OKc
importing rows 0 to 100...Done. 0.1130983829498291 total seconds elapsed
importing rows 100 to 200...Done. 0.22275733947753906 total seconds elapsed
importing rows 200 to 300...Done. 0.3316996097564697 total seconds elapsed
importing rows 300 to 400...Done. 0.4364771842956543 total seconds elapsed
importing rows 400 to 500...Done. 0.5447883605957031 total seconds elapsed
importing rows 500 to 600...Done. 0.6541545391082764 total seconds elapsed
importing rows 600 to 700...Done. 0.7696559429168701 total seconds elapsed
importing rows 700 to 800...Done. 0.868156909942627 total seconds elapsed
importing rows 800 to 900...Done. 0.9690794944763184 total seconds elapsed
importing rows 900 to 1000...Done. 1.0598564147949219 total seconds elapsed
importing rows 1000 to 1100...Done. 1.1251537799835205 total seconds elapsed
importing rows 1100 to 1200...Done. 1.2315566539764404 total seconds elapsed
importing rows 1200 to 1300...Done. 1.339611291885376 total seconds elapsed
importing

In [8]:
my_data

Unnamed: 0,imdb_id,kaggle_id,title,original_title,tagline,belongs_to_collection,wikipedia_url,imdb_link,runtime,budget,...,rating_0.5,rating_1.0,rating_1.5,rating_2.0,rating_2.5,rating_3.0,rating_3.5,rating_4.0,rating_4.5,rating_5.0
0,tt0098987,9548,The Adventures of Ford Fairlane,The Adventures of Ford Fairlane,Kojak. Columbo. Dirty Harry. Wimps.,,https://en.wikipedia.org/wiki/The_Adventures_o...,https://www.imdb.com/title/tt0098987/,104.0,49000000.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,tt0098994,25501,"After Dark, My Sweet","After Dark, My Sweet",All they risked was everything.,,"https://en.wikipedia.org/wiki/After_Dark,_My_S...",https://www.imdb.com/title/tt0098994/,114.0,6000000.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,tt0099005,11856,Air America,Air America,The few. The proud. The totally insane.,,https://en.wikipedia.org/wiki/Air_America_(film),https://www.imdb.com/title/tt0099005/,112.0,35000000.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,tt0099012,8217,Alice,Alice,,,https://en.wikipedia.org/wiki/Alice_(1990_film),https://www.imdb.com/title/tt0099012/,102.0,12000000.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,tt0099018,25943,Almost an Angel,Almost an Angel,Who does he think he is?,,https://en.wikipedia.org/wiki/Almost_an_Angel,https://www.imdb.com/title/tt0099018/,95.0,25000000.0,...,3.0,0.0,3.0,2.0,5.0,26.0,37.0,46.0,16.0,11.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6055,tt5639354,429191,A Fantastic Woman,Una mujer fantástica,,,https://en.wikipedia.org/wiki/A_Fantastic_Woman,https://www.imdb.com/title/tt5639354/,104.0,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6056,tt5390066,390059,Permission,Permission,,,https://en.wikipedia.org/wiki/Permission_(film),https://www.imdb.com/title/tt5390066/,96.0,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6057,tt6304162,429174,Loveless,Нелюбовь,,,https://en.wikipedia.org/wiki/Loveless_(film),https://www.imdb.com/title/tt6304162/,128.0,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6058,tt5795086,412302,Gemini,Gemini,,,https://en.wikipedia.org/wiki/Gemini_(2017_film),https://www.imdb.com/title/tt5795086/,92.0,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
