# DS 2002 Midterm: Actor casting based on film history
## Using Python to Perform Extract-Transform-Load (ETL Processing)

#### Import the Necessary Libraries

In [1]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

#### Declare & Assign Connection Variables for the MySQL Server & Databases with which You'll be Working 

In [2]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context.

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x256a91b5370>

### Create & Populate the Dimension Tables
#### Extract Data from the Source Database Tables

In [5]:
sql_actor = "SELECT * FROM sakila.actor;"
df_actor = get_dataframe(user_id, pwd, host_name, src_dbname, sql_actor)
df_actor.head(2)

Unnamed: 0,actor_id,first_name,last_name,last_update
0,1,PENELOPE,GUINESS,2006-02-15 04:34:33
1,2,NICK,WAHLBERG,2006-02-15 04:34:33


In [6]:
sql_film_actor = "SELECT * FROM sakila.film_actor;"
df_film_actor = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film_actor)
df_film_actor.head(2)

Unnamed: 0,actor_id,film_id,last_update
0,1,1,2006-02-15 05:05:03
1,1,23,2006-02-15 05:05:03


In [7]:
df_film_actor.rename(columns={"actor_id":"actor_key"}, inplace=True)
df_film_actor.rename(columns={"film_id":"film_key"}, inplace=True)
df_film_actor.head(2)

Unnamed: 0,actor_key,film_key,last_update
0,1,1,2006-02-15 05:05:03
1,1,23,2006-02-15 05:05:03


In [8]:
df_actor.rename(columns={"actor_id":"actor_key"}, inplace=True)
df_actor.head(2)

Unnamed: 0,actor_key,first_name,last_name,last_update
0,1,PENELOPE,GUINESS,2006-02-15 04:34:33
1,2,NICK,WAHLBERG,2006-02-15 04:34:33


In [9]:
db_operation = "insert"

tables = [('dim_actor', df_actor, 'actor_key'),
          ('dim_film_actor', df_film_actor, 'film_key')]

In [10]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

IntegrityError: (pymysql.err.IntegrityError) (1062, "Duplicate entry '752' for key 'dim_film_actor.PRIMARY'")
[SQL: ALTER TABLE dim_film_actor ADD PRIMARY KEY (film_key);]
(Background on this error at: https://sqlalche.me/e/14/gkpj)

In [11]:
df_actor_films = pd.merge(df_actor, df_film_actor, on='actor_key', how='inner')

df_actor_films.head(2)

Unnamed: 0,actor_key,first_name,last_name,last_update_x,film_key,last_update_y
0,1,PENELOPE,GUINESS,2006-02-15 04:34:33,1,2006-02-15 05:05:03
1,1,PENELOPE,GUINESS,2006-02-15 04:34:33,23,2006-02-15 05:05:03


In [12]:
drop_columns=['last_update_y']
df_actor_films.drop(drop_columns, axis=1, inplace=True)
df_actor_films.rename(columns={"last_update_x":"last_update"}, inplace=True)
df_actor_films.head(2)

Unnamed: 0,actor_key,first_name,last_name,last_update,film_key
0,1,PENELOPE,GUINESS,2006-02-15 04:34:33,1
1,1,PENELOPE,GUINESS,2006-02-15 04:34:33,23


In [13]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"

atlas_cluster_name = "ClusterMidterm.zibbf"
atlas_user_name = "yvf7ua"
atlas_password = "Mpelp3OVXou5Jjrw"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb://yvf7ua:Mpelp3OVXou5Jjrw@ac-q2qcyq5-shard-00-00.7rtqy6x.mongodb.net:27017,ac-q2qcyq5-shard-00-01.7rtqy6x.mongodb.net:27017,ac-q2qcyq5-shard-00-02.7rtqy6x.mongodb.net:27017/?ssl=true&replicaSet=atlas-khukmn-shard-0&authSource=admin&retryWrites=true&w=majority"
}

src_dbname = "sakila"
dst_dbname = "sakila_dw"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb://yvf7ua:Mpelp3OVXou5Jjrw@ac-q2qcyq5-shard-00-00.7rtqy6x.mongodb.net:27017,ac-q2qcyq5-shard-00-01.7rtqy6x.mongodb.net:27017,ac-q2qcyq5-shard-00-02.7rtqy6x.mongodb.net:27017/?ssl=true&replicaSet=atlas-khukmn-shard-0&authSource=admin&retryWrites=true&w=majority


In [14]:
def get_sql_dataframe(user_id, pwd, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


In [15]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"film" : 'film.json'}

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close() 

In [16]:
query = {}
collection = "film"

df_film = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [17]:
df_film.rename(columns={"film_id":"film_key"}, inplace=True)
drop_cols = ['language_id','original_language_id','rental_duration','rental_rate','special_features',
             'last_update']
df_film.drop(drop_cols, axis=1, inplace=True)
df_film.head(2)

Unnamed: 0,film_key,title,description,release_year,length,replacement_cost,rating
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,86,20.99,PG
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,48,12.99,G


In [18]:
dataframe = df_film
table_name = 'dim_film'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [19]:
sql_film = "SELECT * FROM sakila_dw.dim_film;"
df_dim_film = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_film)
df_dim_film.head(2)

Unnamed: 0,film_key,title,description,release_year,length,replacement_cost,rating
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,86,20.99,PG
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,48,12.99,G


In [20]:
df_fact_table = pd.merge(df_actor_films, df_dim_film, on='film_key', how='right')
df_fact_table.head(2)

Unnamed: 0,actor_key,first_name,last_name,last_update,film_key,title,description,release_year,length,replacement_cost,rating
0,1.0,PENELOPE,GUINESS,2006-02-15 04:34:33,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,86,20.99,PG
1,10.0,CHRISTIAN,GABLE,2006-02-15 04:34:33,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,86,20.99,PG


In [21]:
drop_columns = ['replacement_cost']
df_fact_table.drop(drop_columns, axis=1, inplace=True)

ordered_columns = ['actor_key','first_name','last_name','film_key','title','description','release_year','length','rating',
                  'last_update']
df_fact_table=df_fact_table[ordered_columns]

df_fact_table.head(2)

Unnamed: 0,actor_key,first_name,last_name,film_key,title,description,release_year,length,rating,last_update
0,1.0,PENELOPE,GUINESS,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,86,PG,2006-02-15 04:34:33
1,10.0,CHRISTIAN,GABLE,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,86,PG,2006-02-15 04:34:33


In [22]:
df_fact_table.insert(0, "fact_casting_key", range(1, df_fact_table.shape[0]+1))

In [23]:
dataframe = df_fact_table
table_name = 'fact_casting'
primary_key = 'fact_casting_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)