## DS-2002 Project 1
### Shireen Shah sts5dcm

The Sakila sample database represents a DVD rental store. Data in SQL and JSON formats modified using MongoDB and loaded back into mySQL.

#### Import the Necessary Libraries

In [124]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases 

In [None]:
mysql_uid = "root"
mysql_pwd = "p00k1e123"
mysql_hostname = "127.0.0.1"

atlas_cluster_name = "Sally.tiurofi"
atlas_user_name = "sts5dcm"
atlas_password = "p00k1e"

ma_conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

msq_src_dbname = "sakila"
ma_src_dbname = "sakila_tables"
dst_dbname = "sakila_dm"

print(f"Local Connection String: {ma_conn_str['local']}")
print(f"Atlas Connection String: {ma_conn_str['atlas']}")  

#### Define Functions for Retrieval and Loading

In [136]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    msq_conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(msq_conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe

def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe

def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    msq_conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(msq_conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

### Create Destination Database in MySQL

In [None]:
msq_conn_str = f"mysql+pymysql://{mysql_uid}:{mysql_pwd}@{mysql_hostname}"
sqlEngine = create_engine(msq_conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

#### Populate MongoDB with Source Data
You only need to run this cell once; however, the operation is *idempotent*.  In other words, it can be run multiple times without changing the end result.

In [None]:
#populating mongo from local json 

client = pymongo.MongoClient(uri["atlas"])
db = client[ma_src_dbname]

data_dir = os.path.join(os.getcwd(), 'sakila-db')

json_files = {"customer" : 'sakila_customer.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()    

### Create and Populate the New Dimension Tables

In [None]:
query = {}
collection = "customer"

df_customer = get_mongo_dataframe(uri['atlas'], ma_src_dbname, collection, query)
df_customer.head()

In [None]:
# Extract Rental Data from mySQL

sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, msq_src_dbname, sql_rental)
df_rental.head()

#### Extract Date/Time Data from the Source MySQL Schema Into DataFrames


In [None]:
# populate sakila_dm with dim_date prior to running below cell, run Create_Populate_Dim_Date.sql

sql_dim_date = "SELECT date_key, full_date FROM sakila_dm.dim_date;"
df_dim_date = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64')
df_dim_date.head(2)

#### Extract Payment Data from the Local File Source Into DataFrames

In [None]:
# finding and reading local JSON file

json_pathname = '/Users/shireenshah/Documents/Shireen Shah_DS-2002_Project 1/sakila-db/sakila_payment.json'

df_payment = pd.read_json(json_pathname)

df_payment.head()

### Transformations

In [None]:
# customer data

df_customer.rename(columns={"customer_id":"customer_key","store_id":"store_key","first_name":"customer_first_name",
                           "last_name":"customer_last_name", "email":"customer_email", 'address_id':'address_key'},
                   inplace=True)

df_customer.drop(['last_update', 'active', 'create_date'], axis=1, inplace=True)

df_customer.head()

In [None]:
# payment

df_payment.rename(columns={"payment_id" : "payment_key", "rental_id":"rental_key"}, inplace=True)

df_payment[['payment_date', 'payment_time']] = df_payment['payment_date'].str.split(' ', 1, expand=True)

df_payment.drop(['customer_id', 'staff_id','last_update', 'payment_time'], axis=1, inplace=True)

df_payment.head()

In [None]:
# rental

df_rental.rename(columns={"rental_id" : "rental_key", "inventory_id":"inventory_key", "customer_id":"customer_key",
                         "staff_id":"staff_key"}, inplace=True)


df_rental['rental_date'] = df_rental['rental_date'].astype(str)
df_rental[['rental_date', 'rental_time']] = df_rental['rental_date'].str.split(' ', 1, expand=True)


df_rental['return_date'] = df_rental['return_date'].astype(str)
df_rental[['return_date', 'return_time']] = df_rental['return_date'].str.split(' ', 1, expand=True)


df_rental.drop(['last_update', 'rental_time', 'return_time'], axis=1, inplace=True)

df_rental.head()

### Create & Transform Fact Table

In [None]:
# joining data frames using Pandas

df_joined = pd.merge(df_customer, df_payment, on = 'customer_key', how = 'right')

df_joined = pd.merge(df_joined, df_rental, on = 'rental_key', how = 'right')

df_joined.head()

In [None]:
# add primary key, drop and reorder columns

df_joined.drop(['rental_key','amount', 'customer_first_name','customer_last_name', 'customer_email'], axis=1, inplace=True)

reordered = [ 'rental_date_key', 'return_date_key', 'payment_key', 'payment_date_key',
                    'inventory_key', 'customer_key', 'staff_key', 'store_key', 'address_key']
df_joined = df_joined[reordered]

# primary key
df_joined.insert(0, "fact_rental_purchase_key", range(1, df_joined.shape[0]+1))

df_joined.head()

### Drop Unnessary Keys

In [None]:
df_customer.drop(['store_key', 'address_key'], axis=1, inplace=True)
df_payment.drop(['rental_key', 'payment_date'], axis=1, inplace=True)
df_customer.head()
df_payment.head()

### Load back into MySQL

In [None]:
# fact table

dataframe = df_joined
table_name = 'fact_rental_payment'
primary_key = 'fact_rental_purchase_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [None]:
# customer dimension

dataframe = df_customer
table_name = 'dim_customer'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [None]:
# payment dimension 

dataframe = df_payment
table_name = 'dim_payment'
primary_key = 'payment_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

### Validate in MySQL

In [None]:
sql_joined = "SELECT * FROM sakila_dm.fact_rental_payment;"
df_sql_joined = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_joined)
df_sql_joined.head()

In [None]:
sql_customer = "SELECT * FROM sakila_dm.dim_customer;"
df_sql_customer = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_customer)
df_sql_customer.head()

In [None]:
sql_payment = "SELECT * FROM sakila_dm.dim_payment;"
df_sql_payment = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_payment)
df_sql_payment.head()