### Extract Data 

The csv files for the project can be found under data folder. 

Requried libraries: 
- psycopg2
- pandas
- sqlalchemy 

In [17]:
import psycopg2
import pandas as pd
import sqlalchemy

Connecting to PostgreSQL database that is hosted on a Google Cloud Platform 

Update pwd variable with a password and host value for connecting to the database. 
Typically the password is extracted from a environment variable and is stored in a separate file. 

In [18]:
try:
    pwd = "xxxxxxxxx"
    conn = psycopg2.connect("host=xx.xx.xx.xx dbname=postgres user=postgres password={}".format(pwd))
    conn.set_session(autocommit=True)
    
    cur = conn.cursor()
    
    engine = sqlalchemy.create_engine("postgresql://postgres:{}@34.145.47.78:5432/postgres".format(pwd))
    engine.raw_connection().set_session(autocommit=True)
except Exception as e:
    print("Error occurred while connecting to the database.")
    print(e)

Reading all the csv files and storing them on to a pandas DataFrame. Then we are showing information regarding each dataframe and storing them into a staging tables.

In [19]:
names = pd.read_csv("data/movie-datasets/IMDb_names.csv")
ratings = pd.read_csv("data/movie-datasets/IMDb_ratings.csv")
title_principals = pd.read_csv("data/movie-datasets/IMDb_title_principals.csv")
movies = pd.read_csv("data/movie-datasets/IMDb_movies.csv")

In [20]:
names.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 297705 entries, 0 to 297704
Data columns (total 17 columns):
 #   Column                 Non-Null Count   Dtype  
---  ------                 --------------   -----  
 0   imdb_name_id           297705 non-null  object 
 1   name                   297705 non-null  object 
 2   birth_name             297705 non-null  object 
 3   height                 44681 non-null   float64
 4   bio                    204698 non-null  object 
 5   birth_details          110612 non-null  object 
 6   date_of_birth          110612 non-null  object 
 7   place_of_birth         103992 non-null  object 
 8   death_details          39933 non-null   object 
 9   date_of_death          39933 non-null   object 
 10  place_of_death         37038 non-null   object 
 11  reason_of_death        22694 non-null   object 
 12  spouses_string         45352 non-null   object 
 13  spouses                297705 non-null  int64  
 14  divorces               297705 non-nu

In [21]:
cur.execute("""
    DROP TABLE IF EXISTS stg_names
""")

cur.execute("""
    CREATE TABLE IF NOT EXISTS stg_names(
        imdb_name_id varchar,
        name varchar,
        birth_name varchar,
        height varchar,
        bio varchar,
        birth_details varchar,
        date_of_birth varchar,
        place_of_birth varchar,
        death_details varchar,
        date_of_death varchar,
        place_of_death varchar,
        reason_of_death varchar,
        spouses_string varchar,
        spouses varchar,
        divorces varchar,
        spouses_with_children varchar,
        children varchar,
        created_date timestamp default now()
    );
""")

In [22]:
ratings.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 85855 entries, 0 to 85854
Data columns (total 49 columns):
 #   Column                     Non-Null Count  Dtype  
---  ------                     --------------  -----  
 0   imdb_title_id              85855 non-null  object 
 1   weighted_average_vote      85855 non-null  float64
 2   total_votes                85855 non-null  int64  
 3   mean_vote                  85855 non-null  float64
 4   median_vote                85855 non-null  float64
 5   votes_10                   85855 non-null  int64  
 6   votes_9                    85855 non-null  int64  
 7   votes_8                    85855 non-null  int64  
 8   votes_7                    85855 non-null  int64  
 9   votes_6                    85855 non-null  int64  
 10  votes_5                    85855 non-null  int64  
 11  votes_4                    85855 non-null  int64  
 12  votes_3                    85855 non-null  int64  
 13  votes_2                    85855 non-null  int

In [23]:
cur.execute("""
    DROP TABLE IF EXISTS stg_ratings
""")

cur.execute("""
    CREATE TABLE IF NOT EXISTS stg_ratings(
        imdb_title_id varchar,
        weighted_average_vote varchar,
        total_votes varchar,
        mean_vote varchar,
        median_vote varchar,
        votes_10 varchar,
        votes_9 varchar,
        votes_8 varchar,
        votes_7 varchar,
        votes_6 varchar,
        votes_5 varchar,
        votes_4 varchar,
        votes_3 varchar,
        votes_2 varchar,
        votes_1 varchar,
        allgenders_0age_avg_vote varchar,
        allgenders_0age_votes varchar,
        allgenders_18age_avg_vote varchar,
        allgenders_18age_votes varchar,
        allgenders_30age_avg_vote varchar,
        allgenders_30age_votes varchar,
        allgenders_45age_avg_vote varchar,
        allgenders_45age_votes varchar,
        males_allages_avg_vote varchar,
        males_allages_votes varchar,
        males_0age_avg_vote varchar,
        males_0age_votes varchar,
        males_18age_avg_vote varchar,
        males_18age_votes varchar,
        males_30age_avg_vote varchar,
        males_30age_votes varchar,
        males_45age_avg_vote varchar,
        males_45age_votes varchar,
        females_allages_avg_vote varchar,
        females_allages_votes varchar,
        females_0age_avg_vote varchar,
        females_0age_votes varchar,
        females_18age_avg_vote varchar,
        females_18age_votes varchar,
        females_30age_avg_vote varchar,
        females_30age_votes varchar,
        females_45age_avg_vote varchar,
        females_45age_votes varchar,
        top1000_voters_rating varchar,
        top1000_voters_votes varchar,
        us_voters_rating varchar,
        us_voters_votes varchar,
        non_us_voters_rating varchar,
        non_us_voters_votes varchar,
        created_date timestamp default now()
    );
""")

In [24]:
title_principals.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 835513 entries, 0 to 835512
Data columns (total 6 columns):
 #   Column         Non-Null Count   Dtype 
---  ------         --------------   ----- 
 0   imdb_title_id  835513 non-null  object
 1   ordering       835513 non-null  int64 
 2   imdb_name_id   835513 non-null  object
 3   category       835513 non-null  object
 4   job            212731 non-null  object
 5   characters     340836 non-null  object
dtypes: int64(1), object(5)
memory usage: 38.2+ MB


In [25]:
cur.execute("""
    DROP TABLE IF EXISTS stg_title_principals
""")

cur.execute("""
    CREATE TABLE IF NOT EXISTS stg_title_principals(
        imdb_title_id varchar,
        ordering varchar,
        imdb_name_id varchar,
        category varchar,
        job varchar,
        characters varchar,
        created_date timestamp default now()
    );
""")

In [26]:
movies.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 85855 entries, 0 to 85854
Data columns (total 22 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   imdb_title_id          85855 non-null  object 
 1   title                  85855 non-null  object 
 2   original_title         85855 non-null  object 
 3   year                   85855 non-null  int64  
 4   date_published         85855 non-null  object 
 5   genre                  85855 non-null  object 
 6   duration               85855 non-null  int64  
 7   country                85791 non-null  object 
 8   language               85022 non-null  object 
 9   director               85768 non-null  object 
 10  writer                 84283 non-null  object 
 11  production_company     81400 non-null  object 
 12  actors                 85786 non-null  object 
 13  description            83740 non-null  object 
 14  avg_vote               85855 non-null  float64
 15  vo

In [27]:
cur.execute("""
    DROP TABLE IF EXISTS stg_movies
""")

cur.execute("""
    CREATE TABLE IF NOT EXISTS stg_movies(
        imdb_title_id varchar,
        title varchar,
        original_title varchar,
        year varchar,
        date_published varchar,
        genre varchar,
        duration varchar,
        country varchar,
        language varchar,
        director varchar,
        writer varchar,
        production_company varchar,
        actors varchar,
        description varchar,
        avg_vote varchar,
        votes varchar,
        budget varchar,
        usa_gross_income varchar,
        worlwide_gross_income varchar,
        metascore varchar,
        reviews_from_users varchar,
        reviews_from_critics varchar,
        created_date timestamp default now()
    );
""")

Deleting data for the same day before inserting the data so that duplicate data are not stored. 

In [28]:
cur.execute("""
    DELETE FROM stg_names 
    WHERE date_trunc('day', created_date) = date_trunc('day', now())
""")

names.to_sql('stg_names', con=engine, if_exists="append", index=False, method="multi", chunksize=500)

In [29]:
cur.execute("""
    DELETE FROM stg_ratings 
    WHERE date_trunc('day', created_date) = date_trunc('day', now())
""")

ratings.to_sql('stg_ratings', con=engine, if_exists="append", index=False, method="multi", chunksize=500)

In [30]:
cur.execute("""
    DELETE FROM stg_title_principals 
    WHERE date_trunc('day', created_date) = date_trunc('day', now())
""")

titles.to_sql('stg_title_principals', con=engine, if_exists="append", index=False, method="multi", chunksize=500)

In [31]:
cur.execute("""
    DELETE FROM stg_movies
    WHERE date_trunc('day', created_date) = date_trunc('day', now())
""")

movies.to_sql('stg_movies', con=engine, if_exists="append", index=False, method="multi", chunksize=500)

In [32]:
cur.execute("SELECT count(*) from stg_names WHERE date_trunc('day', created_date) = date_trunc('day', now())")
print("Total number of rows inserted into stg_names is : {}".format(cur.fetchone()[0]))

cur.execute("SELECT count(*) from stg_ratings WHERE date_trunc('day', created_date) = date_trunc('day', now())")
print("Total number of rows inserted into stg_ratings is : {}".format(cur.fetchone()[0]))

cur.execute("SELECT count(*) from stg_title_principals WHERE date_trunc('day', created_date) = date_trunc('day', now())")
print("Total number of rows inserted into stg_title_principals is : {}".format(cur.fetchone()[0]))

cur.execute("SELECT count(*) from stg_movies WHERE date_trunc('day', created_date) = date_trunc('day', now())")
print("Total number of rows inserted into stg_movies is : {}".format(cur.fetchone()[0]))

Total number of rows inserted into stg_names is : 297705
Total number of rows inserted into stg_ratings is : 85855
Total number of rows inserted into stg_title_principals is : 835513
Total number of rows inserted into stg_movies is : 85855
