## DS2002 Midterm - Mina Gorani (cap9et)

#### Script summary:
Three data tables from Sakila were used to populate three dataframes that were then merged to create a fact table. Prior to merging the dataframes into the fact table, time was removed from the three date columns. The dim_date table was used to create secondary keys in the fact table for rental and return dates. Those two columns were the only columns stored in the rental dataframe, making having a separate rental dimension table unnessary. The remaining two dataframes were transformed into dimension tables. The two dimension tables and  the fact table were loaded back into MySQL. 

#### Import the Necessary Libraries

In [1]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases 

In [2]:
mysql_uid = "root"
mysql_pwd = "Ch1pmunk!123"
mysql_hostname = "localhost"

atlas_cluster_name = "ds2002cluster.m09nd7q"
atlas_user_name = "cap9et"
atlas_password = "Ch1pmunk!123"

ma_conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}


msq_src_dbname = "sakila"
ma_src_dbname = "sakila_tables"
dst_dbname = "sakila_dm"

print(f"Local Connection String: {ma_conn_str['local']}")
print(f"Atlas Connection String: {ma_conn_str['atlas']}")
      

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://cap9et:Ch1pmunk!123@ds2002cluster.m09nd7q.mongodb.net


#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    msq_conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(msq_conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe

#goes to mysqlbench to get dataframe

def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe

#goes to mongo to get dataframe

def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    msq_conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(msq_conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()
    
#create a new dataframe using mysql, make sure that datafolder with json data is in the same location

### Create Destination Database in MySQL

In [4]:
msq_conn_str = f"mysql+pymysql://{mysql_uid}:{mysql_pwd}@{mysql_hostname}"
sqlEngine = create_engine(msq_conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.result.ResultProxy at 0x273f88948c8>

#### Populate MongoDB with Source Data
You only need to run this cell once; however, the operation is *idempotent*.  In other words, it can be run multiple times without changing the end result.

In [7]:
#populating mongo from local json 

client = pymongo.MongoClient(ma_conn_str["atlas"])
db = client[ma_src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'sakila-db')

json_files = {"customer" : 'sakila_customer.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()    

#grabs json data from the data folder in the NoSQL folder and opens a connection and loads it into the new datawarehouse 

### Create and Populate the New Dimension Tables
#### Extract Customer Data from the Source MongoDB Collection Into DataFrames

In [8]:
query = {} # Select all elements (columns), and all documents (rows).
collection = "customer"

df_customer = get_mongo_dataframe(ma_conn_str['atlas'], ma_src_dbname, collection, query)  # Specify 'atlas', or 'local'
df_customer.head(2)

#no criteria needs to be within query for mongo db to get the data 

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


#### Extract Rental Data from the Source MySQL Schema Into DataFrames

In [9]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, msq_src_dbname, sql_rental)
df_rental.head(2)


Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


#### Extract Date/Time Data from the Source MySQL Schema Into DataFrames
##### Must run lab 2c to populate sakila_dm (destination database) with dim_date prior to running below cell

In [10]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dm.dim_date;"
df_dim_date = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64')
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


#### Extract Payment Data from the Local File Source Into DataFrames

In [11]:
# Define the file path
json_file_path = r'C:\Users\msgor\OneDrive - University of Virginia\Fall 2023\Data Science Systems\Labs\GoraniMina_DS2002_Midterm\sakila-db\sakila_payment.json'

# Use pandas to read the JSON file
df_payment = pd.read_json(json_file_path)

df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


### Perform Transformations on DataFrames to Prepare for Join
#### Transform Customer Data

In [12]:
df_customer.rename(columns={"customer_id":"customer_key","store_id":"store_key","first_name":"customer_first_name",
                           "last_name":"customer_last_name", "email":"customer_email", 'address_id':'address_key'},
                   inplace=True)

df_customer.drop(['last_update', 'active', 'create_date'], axis=1, inplace=True)

df_customer.head(2)

Unnamed: 0,customer_key,store_key,customer_first_name,customer_last_name,customer_email,address_key
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6


#### Transform Rental Data

In [13]:
df_rental.rename(columns={"rental_id" : "rental_key", "inventory_id":"inventory_key", "customer_id":"customer_key",
                         "staff_id":"staff_key"}, inplace=True)

#remove the time dimension by turning the date columns into a string, breaking them up based on a space delimiter,
#then deleting the time 

df_rental['rental_date'] = df_rental['rental_date'].astype(str)
df_rental[['rental_date', 'rental_time']] = df_rental['rental_date'].str.split(' ', 1, expand=True)


df_rental['return_date'] = df_rental['return_date'].astype(str)
df_rental[['return_date', 'return_time']] = df_rental['return_date'].str.split(' ', 1, expand=True)


df_rental.drop(['last_update', 'rental_time', 'return_time'], axis=1, inplace=True)

df_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date,staff_key
0,1,2005-05-24,367,130,2005-05-26,1
1,2,2005-05-24,1525,459,2005-05-28,1


#### Transform Payment Data

In [14]:
df_payment.rename(columns={"payment_id" : "payment_key", "rental_id":"rental_key"}, inplace=True)

df_payment[['payment_date', 'payment_time']] = df_payment['payment_date'].str.split(' ', 1, expand=True)

df_payment.drop(['customer_id', 'staff_id','last_update', 'payment_time'], axis=1, inplace=True)

df_payment.head(2)

Unnamed: 0,payment_key,rental_key,amount,payment_date
0,1,76,2.99,2005-05-25
1,2,573,0.99,2005-05-28


### Create & Transform Fact Table from Data Frames 

#### Join Data Frames Using Pandas Merge

In [15]:
# frp = fact rental payment

df_frp = pd.merge(df_rental, df_payment, on = 'rental_key', how= 'right')

df_frp = pd.merge(df_frp, df_customer, on = 'customer_key', how = 'right')

df_frp.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date,staff_key,payment_key,amount,payment_date,store_key,customer_first_name,customer_last_name,customer_email,address_key
0,76.0,2005-05-25,3021.0,1,2005-06-03,2.0,1.0,2.99,2005-05-25,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5
1,573.0,2005-05-28,4020.0,1,2005-06-03,1.0,2.0,0.99,2005-05-28,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5


#### Create DateKeys from Date Dimension Table

In [16]:
#fix rental_date
#create intermediate data frame to merge for the rental date 
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})

# ensure fact table column is in same data form as intermediate 
df_frp['rental_date'] = df_frp['rental_date'].astype('datetime64[ns]')

# merge intermediate df with fact table 
df_frp = pd.merge(df_frp, df_dim_rental_date, on='rental_date', how='left')

#remove initial date dimension that lacked proper info connection
df_frp.drop(['rental_date'], axis=1, inplace=True)

df_frp.head(2)

Unnamed: 0,rental_key,inventory_key,customer_key,return_date,staff_key,payment_key,amount,payment_date,store_key,customer_first_name,customer_last_name,customer_email,address_key,rental_date_key
0,76.0,3021.0,1,2005-06-03,2.0,1.0,2.99,2005-05-25,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,20050525.0
1,573.0,4020.0,1,2005-06-03,1.0,2.0,0.99,2005-05-28,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,20050528.0


In [17]:
#fix return_date
#repeat above cell for x2 other date columns 


df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_frp['return_date'] = df_frp['return_date'].astype('datetime64[ns]')
df_frp = pd.merge(df_frp, df_dim_return_date, on='return_date', how='left')
df_frp.drop(['return_date'], axis=1, inplace=True)

df_frp.head(2)

Unnamed: 0,rental_key,inventory_key,customer_key,staff_key,payment_key,amount,payment_date,store_key,customer_first_name,customer_last_name,customer_email,address_key,rental_date_key,return_date_key
0,76.0,3021.0,1,2.0,1.0,2.99,2005-05-25,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,20050525.0,20050603.0
1,573.0,4020.0,1,1.0,2.0,0.99,2005-05-28,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,20050528.0,20050603.0


In [18]:
#fix payment_date

df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_frp['payment_date'] = df_frp['payment_date'].astype('datetime64[ns]')
df_frp = pd.merge(df_frp, df_dim_payment_date, on='payment_date', how='left')
df_frp.drop(['payment_date'], axis=1, inplace=True)

df_frp.head(2)

Unnamed: 0,rental_key,inventory_key,customer_key,staff_key,payment_key,amount,store_key,customer_first_name,customer_last_name,customer_email,address_key,rental_date_key,return_date_key,payment_date_key
0,76.0,3021.0,1,2.0,1.0,2.99,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,20050525.0,20050603.0,20050525.0
1,573.0,4020.0,1,1.0,2.0,0.99,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,20050528.0,20050603.0,20050528.0


#### Drop & Reorder Fact Table Columns, Add Primary Key

In [19]:
# drop everything but keys
#rental key and rental dimension table is no longer needed as all the info is reflected in the new rental and return date keys
df_frp.drop(['rental_key','amount', 'customer_first_name','customer_last_name', 'customer_email'], axis=1, inplace=True)

# Reorder the Columns
reordered_columns = [ 'rental_date_key', 'return_date_key', 'payment_key', 'payment_date_key',
                    'inventory_key', 'customer_key', 'staff_key', 'store_key', 'address_key']
df_frp = df_frp[reordered_columns]

#Insert new column to serve as primary key 
df_frp.insert(0, "fact_rental_purchase_key", range(1, df_frp.shape[0]+1))

df_frp.head(2)

Unnamed: 0,fact_rental_purchase_key,rental_date_key,return_date_key,payment_key,payment_date_key,inventory_key,customer_key,staff_key,store_key,address_key
0,1,20050525.0,20050603.0,1.0,20050525.0,3021.0,1,2.0,1,5
1,2,20050528.0,20050603.0,2.0,20050528.0,4020.0,1,1.0,1,5


### Drop Unnessary Keys from Data Frames that will as Dimension Tables

In [20]:
df_customer.drop(['store_key', 'address_key'], axis=1, inplace=True)

df_customer.head(2)

Unnamed: 0,customer_key,customer_first_name,customer_last_name,customer_email
0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
1,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org


In [21]:
df_payment.drop(['rental_key', 'payment_date'], axis=1, inplace=True)

df_payment.head(2)

Unnamed: 0,payment_key,amount
0,1,2.99
1,2,0.99


### Load Fact Table & Dimension Tables Back into Data Mart in MySQL

In [22]:
# fact table

dataframe = df_frp
table_name = 'fact_rental_payment'
primary_key = 'fact_rental_purchase_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [23]:
# customer dimension

dataframe = df_customer
table_name = 'dim_customer'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [24]:
# payment dimension 

dataframe = df_payment
table_name = 'dim_payment'
primary_key = 'payment_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

### Validate that Fact Table & New Dimension Tables were Created in MySQL

In [25]:
sql_frp = "SELECT * FROM sakila_dm.fact_rental_payment;"
df_sql_frp = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_frp)
df_sql_frp.head(2)

Unnamed: 0,fact_rental_purchase_key,rental_date_key,return_date_key,payment_key,payment_date_key,inventory_key,customer_key,staff_key,store_key,address_key
0,1,20050525.0,20050603.0,1.0,20050525.0,3021.0,1,2.0,1,5
1,2,20050528.0,20050603.0,2.0,20050528.0,4020.0,1,1.0,1,5


In [26]:
sql_customer = "SELECT * FROM sakila_dm.dim_customer;"
df_sql_customer = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_customer)
df_sql_customer.head(2)

Unnamed: 0,customer_key,customer_first_name,customer_last_name,customer_email
0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
1,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org


In [27]:
sql_payment = "SELECT * FROM sakila_dm.dim_payment;"
df_sql_payment = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_payment)
df_sql_payment.head(2)

Unnamed: 0,payment_key,amount
0,1,2.99
1,2,0.99


In [31]:
# select statement with average operation to return the average purchase amount

sql_avg_payment = "SELECT AVG(amount) FROM sakila_dm.dim_payment;"
avg_payment_amount = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_avg_payment)
avg_payment_amount.head(1)


Unnamed: 0,AVG(amount)
0,4.132
