In [1]:
import time 
import json
import pandas as pd
import numpy as np

import re
import dateutil
from sqlalchemy import create_engine
import psycopg2
from config import db_password

In [2]:
db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movies_data" 

engine = create_engine(db_string)

In [3]:
def parse_dollars(s):  
    s = str(s)
    match = re.search(r"\$([\d,.]+)[^\d,.]?.*([mb])il",s) 
    if not match: 
        return np.nan 
    digit = float(match[1].replace(",",""))
    try: 
        multiplier=match[2]  
    except IndexError: 
        multiplier = 1.0 
    else: 
        multiplier = {"m":1e6, "b":1e9}[multiplier] 
    return digit*multiplier 

def parse_date(s): 
    if not isinstance(s,list): 
        s=[s] 
    for i in s: 
        try: 
            date = pd.to_datetime(i) 
        except: 
            continue 
        else: 
            return date 
        return np.nan 
    
def parse_time(s): 
    s = str(s) 
    m = re.search(r"(\d+)", s) 
    if not m: 
        return np.nan 
    else: 
        return pd.to_timedelta(float(m[1]),"min") 

In [4]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):

    
    
    

    return movie

In [5]:
def wkm():
    # 2. Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata = pd.read_csv(kaggle_file) 
    ratings = pd.read_csv(ratings_file) 
    
    # 3. Open the read the Wikipedia data JSON file.
    with open (wiki_file,'r') as f: 
        wiki_movies = json.load(f) 
    wiki_movies = [entry for entry in wiki_movies if "No. of episodes" not in entry] 
    wiki_movies = [clean_movie(entry) for entry in wiki_movies]

    wiki_movies_with_id = []
    imdb_ids = set()
    for entry in wiki_movies:  
        try: 
            m = re.search(r"title/(tt\d{7,8})/", entry["imdb_link"]) 
            imdb_id = m[1]
        except Exception as e:  
            print(f"{entry.get('title', 'N/A')}: {e}")
            continue
        if imdb_id in imdb_ids: 
            continue 
        entry["imdb_id"] = imdb_id
        imdb_ids.add(imdb_id)
        wiki_movies_with_id.append(entry)
        
    wiki_movies_df = pd.DataFrame(wiki_movies_with_id)  
    wiki_movies_df = wiki_movies_df.dropna(how='all',axis=1)
    #box_office = wiki_movies_df["Box office"].dropna()
    wiki_movies_df["Box office"] = wiki_movies_df["Box office"].apply(parse_dollars) 
    wiki_movies_df["Release date"] = wiki_movies_df["Release date"].apply(parse_date) 
    wiki_movies_df["Running time"] = wiki_movies_df["Running time"].apply(parse_time)  
    
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns') 
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True' 
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise') 
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date']) 
    # 5. Return the three DataFrames
    # 5. Return the three DataFrames
    
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'], how= "inner")  
    #Drop useless columns that clutter
    movies_df.drop(columns=
        [ "Polish", "Chinese", "Yiddish", "Arabic", 'Hebrew', "Russian", "Cantonese"
        , "Japanese", "McCune–Reischauer", "Revised Romanization", "Hangul", "French", "Mandarin"
        ], inplace=True)
    movies_df.drop(columns=["Hepburn","Species"], inplace=True) 
    #Drop columns required by step 5 
    movies_df.drop(columns=["Original language(s)","Original language","Language"], inplace=True)
    movies_df.drop(columns=['Production company(s)', 'Productioncompanies ', 'Productioncompany '], inplace=True)
    movies_df.drop(columns=['Release date'], inplace=True) 
    movies_df.drop(columns=['Original title', 'title_wiki'], inplace=True) 

    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
            , axis=1)
        df.drop(columns=[wiki_column], inplace=True)

    fill_missing_kaggle_data(movies_df, 'revenue', 'Box office')
    fill_missing_kaggle_data(movies_df, 'budget', 'Budget') 
    fill_missing_kaggle_data(movies_df, 'runtime', 'Running time')



    movies_df = (movies_df.loc[:, 
        ["imdb_id",'id','title_kaggle','original_title','tagline','belongs_to_collection', 'url','imdb_link','runtime', 
         'budget','revenue','release_date','popularity','vote_average', 'vote_count','genres','original_language', 
         'overview','spoken_languages','Country','production_companies','production_countries','Distributor','Producer(s)',
        'Director','Starring','Cinematography','Editor(s)','Written by','Composer(s)','Based on']])

    movies_df.rename({'id':'kaggle_id',
                      'title_kaggle':'title',
                      'url':'wikipedia_url',
                      'Country':'country',
                      'Distributor':'distributor',
                      'Producer(s)':'producers',
                      'Director':'director',
                      'Starring':'starring',
                      'Cinematography':'cinematography',
                      'Editor(s)':'editors',
                      'Written by':'writers',
                      'Composer(s)':'composers',
                      'Based on':'based_on'
                     }, axis='columns', inplace=True)

    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                    .rename({'userId':'count'}, axis=1) \
                    .pivot(index='movieId',columns='rating', values='count')

    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]  
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

    
    #return wiki_movies_df, kaggle_metadata, ratings
    #return wiki_movies_df, movies_with_ratings_df, rating_counts
    
    # Return
    #  * wiki file (cleaned, un-merged)
    #  * wiki file (cleaned) + kaggle file (cleaned) + ratings file (cleaned)
    #  * wiki file (cleaned) + kaggle file (cleaned)
    #return wiki_movies_df, movies_with_ratings_df, movies_df

    # Return
    #  * wiki file (cleaned, un-merged)
    #  * wiki file (cleaned) + kaggle file (cleaned)
    #  * wiki file (cleaned) + kaggle file (cleaned) + ratings file (cleaned)
    #  * kaggle_file (cleaned)
    #  * ratings file (cleaned)
    #return wiki_movies_df, movies_df, movies_with_ratings_df, kaggle_metadata, rating_counts

In [7]:
file_dir = "//Users/johncurran/Desktop/Rutgers Data Sci Bootcamp/Challenges/ETL Challenge (DONE)/archive"
# Wikipedia data
wiki_file = f'{file_dir}/wikipedia-movies.json'
# Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

# 7. Set the three variables in Step 6 equal to the function created in Step 1.
wiki_file, kaggle_file, ratings_file = wkm() #=extract_transform_load() 
#wiki_file, wiki_kaggle_file, wiki_kaggle_ratings_file, kaggle_file, ratings_file = wkm()

In [8]:
# 12. Set the DataFrames from the return statement equal to the file names in Step 11. 
wiki_movies_df = wiki_file
movies_with_ratings_df = kaggle_file
movies_df = ratings_file 

In [9]:
# Do Not run yet! STEP 1
rows_imported = 0
for data in pd.read_csv(f'{file_dir}/ratings.csv', chunksize=1000000):

    print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
    data.to_sql(name='ratings', con=engine, if_exists='append')
    rows_imported += len(data)

    print(f'Done.') 
# STEP 2 Print Elapsed Time
rows_imported = 0
# get the start_time from time.time()
start_time = time.time()
for data in pd.read_csv(f'{file_dir}/ratings.csv', chunksize=1000000):
    print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
    data.to_sql(name='ratings', con=engine, if_exists='append')
    rows_imported += len(data)

    # add elapsed time to final print out
    print(f'Done. {time.time() - start_time} total seconds elapsed')

importing rows 0 to 1000000...Done.
importing rows 1000000 to 2000000...Done.
importing rows 2000000 to 3000000...Done.
importing rows 3000000 to 4000000...Done.
importing rows 4000000 to 5000000...Done.
importing rows 5000000 to 6000000...Done.
importing rows 6000000 to 7000000...Done.
importing rows 7000000 to 8000000...Done.
importing rows 8000000 to 9000000...Done.
importing rows 9000000 to 10000000...Done.
importing rows 10000000 to 11000000...Done.
importing rows 11000000 to 12000000...Done.
importing rows 12000000 to 13000000...Done.
importing rows 13000000 to 14000000...Done.
importing rows 14000000 to 15000000...Done.
importing rows 15000000 to 16000000...Done.
importing rows 16000000 to 17000000...Done.
importing rows 17000000 to 18000000...Done.
importing rows 18000000 to 19000000...Done.
importing rows 19000000 to 20000000...Done.
importing rows 20000000 to 21000000...Done.
importing rows 21000000 to 22000000...Done.
importing rows 22000000 to 23000000...Done.
importing row

In [10]:
# 13. Check the wiki_movies_df DataFrame. 
wiki_movies_df.head()

AttributeError: 'str' object has no attribute 'head'

In [None]:
# 14. Check the movies_with_ratings_df DataFrame.
movies_with_ratings_df.head()

In [None]:
# 15. Check the movies_df DataFrame. 
movies_df.head()