## Midterm Project - Sakila Rentals

### Prerequisites:

#### Import libraries

In [9]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

# !pip install pymysql
import pymysql

Collecting pymysql
  Downloading PyMySQL-1.0.2-py3-none-any.whl (43 kB)
Installing collected packages: pymysql
Successfully installed pymysql-1.0.2


#### Declare and assign connection variables for the MongoDB server, the MySQL server, and databases

In [10]:
mysql_host = "localhost"
mysql_uid = "root"
mysql_pwd = "Passw0rd123"

atlas_cluster = "ds2002.ebys8h5"
atlas_uid = "vdq8tp"
atlas_pwd = "Iforgotit!01"

mongo_conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_uid}:{atlas_pwd}@{atlas_cluster}.mongodb.net"
}

sql_conn_str = f"mysql+pymysql://{mysql_uid}:{mysql_pwd}@{mysql_host}"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

print(f"Local Connection String: {mongo_conn_str['local']}")
print(f"Atlas Connection String: {mongo_conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://vdq8tp:Iforgotit!01@ds2002.ebys8h5.mongodb.net


#### Define functions for getting data from and setting data into databases

In [19]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{mysql_uid}:{mysql_pwd}@{mysql_host}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(conn_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{mysql_uid}:{mysql_pwd}@{mysql_host}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

### MySQL:

#### ETL directly from source database into destination database

In [22]:
sqlEngine = create_engine(sql_conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x15f20842c70>

In [24]:
sql_customers = "SELECT * FROM sakila.customer;"
df_customers = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_host, src_dbname, sql_customers)
df_customers.drop(['address_id', 'last_update'], axis=1, inplace=True)
df_customers.rename(columns={"customer_id":"customer_key", "store_id":"store_key"}, inplace=True)
df_customers.head(2)

Unnamed: 0,customer_key,store_key,first_name,last_name,email,active,create_date
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,2006-02-14 22:04:36


In [25]:
sql_staff = "SELECT * FROM sakila.staff;"
df_staff = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_host, src_dbname, sql_staff)
df_staff.drop(['address_id', 'picture', 'last_update'], axis=1, inplace=True)
df_staff.rename(columns={"staff_id":"staff_key", "store_id":"store_key"}, inplace=True)
df_staff.head(2)

Unnamed: 0,staff_key,first_name,last_name,email,store_key,active,username,password
0,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964
1,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,2,1,Jon,


In [26]:
sql_stores = "SELECT * FROM sakila.store;"
df_stores = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_host, src_dbname, sql_stores)
df_stores.drop(['address_id', 'last_update'], axis=1, inplace=True)
df_stores.rename(columns={"store_id":"store_key", "manager_staff_id":"manager_staff_key"}, inplace=True)
df_stores.head(2)

Unnamed: 0,store_key,manager_staff_key
0,1,1
1,2,2


In [27]:
# include date
db_operation = "insert"
tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_staff', df_staff, 'staff_key'),
          ('dim_stores', df_stores, 'store_key'),
          ]
for table_name, dataframe, primary_key in tables:
    set_dataframe(mysql_uid, mysql_pwd, mysql_host, dst_dbname, dataframe, table_name, primary_key, db_operation)

### MongoDB

#### Export dataframes into JSON and populate MongoDB with source data

In [None]:
#export from mysql into json

In [21]:
client = pymongo.MongoClient(conn_str["local"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"films" : 'sakila_films.json',
              "languages" : 'sakila_languages'
              "rentals" : 'sakila_rentals.json',
              "inventory" : 'sakila_inventory.json',
              "payments" : 'sakila_payments.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()        

#### ETL film dimension table into destination database

In [23]:
query = {}
collection = "films"

df_films = get_mongo_dataframe(mongo_conn_str['local'], src_dbname, collection, query)
df_films.drop(['last_update'], axis=1, inplace=True)
df_films.head(2)

Unnamed: 0,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,language
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",English
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",English


In [None]:
query = {}
collection = "language"

df_languages = get_mongo_dataframe(mongo_conn_str['local'], src_dbname, collection, query)
df_languages.drop(['last_update'], axis=1, inplace=True)
df_languages.rename(columns={"name":"language"}, inplace=True)
df_languages.head(2)

In [None]:
df_films = pd.merge(df_films, df_languages, on='original_language_id', how='left')
df_films.drop(['original_language_id'], axis=1, inplace=True)
df_films.rename(columns={"language":"original_language"}, inplace=True)
df_films = pd.merge(df_films, df_languages, on='language_id', how='left')
df_films.drop(['language_id'], axis=1, inplace=True)
df_films.head(2)

In [None]:
# reorder columns
ordered_columns = []
df_films = df_films[ordered_columns]
df_films.rename(columns={"film_id":"film_key"}, inplace=True)
df_films.head(2)

In [None]:
table_name = "dim_films"
primary_key = "film_key"
db_operation = "insert"
set_dataframe(sql_conn_str, dst_dbname, df_films, table_name, primary_key, db_operation)

#### ETL rental fact table into destination database

In [None]:
query = {}
collection = "rentals"

df_rentals = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_rentals.rename(columns={"customer_id":"renting_customer_key", "staff_id":"rental_staff_key"}, inplace=True)
df_rentals.drop(['last_update'], axis=1, inplace=True)
df_rentals.head(2)

In [None]:
query = {}
collection = "inventory"

df_inventory = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_inventory.drop(['last_update'], axis=1, inplace=True)
df_inventory.head(2)

In [None]:
query = {}
collection = "payments"

df_payments = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_payments.rename(columns={"customer_id":"paying_customer_key", "staff_id":"cashier_staff_key"}, inplace=True)
df_payments.drop(['last_update'], axis=1, inplace=True)
df_payments.head(2)

In [None]:
df_fact_rentals = pd.merge(df_rentals, df_inventory, on='inventory_id', how='left')
df_fact_rentals.drop(['inventory_id'], axis=1, inplace=True)
df_fact_rentals = pd.merge(df_rentals, df_payments, on='rental_id', how='left')
df_fact_rentals.drop(['payment_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

In [None]:
# reorder columns
ordered_columns = []
df_fact_rentals = df_films[ordered_columns]
df_fact_rentals.rename(columns={"rental_id":"rental_key"}, inplace=True)
df_fact_rentals.insert(0, "fact_rental_key", range(1, df_fact_rentals.shape[0]+1))
df_fact_rentals.head(2)

In [None]:
table_name = "fact_rentals"
primary_key = "fact_rental_key"
db_operation = "insert"
set_dataframe(sql_conn_str, dst_dbname, df_fact_rentals, table_name, primary_key, db_operation)

#### Check that all tables were created, populated, and inserted correctly into the destination database