# Data Project 1 - NoSQL

**Name:** Victoria Ok (vyo7tv) <br>
**Database Used:** Sakila (https://dev.mysql.com/doc/sakila/en/)

### Set Up
- libraries
- global connection/database variables
- functions to get and set values from the database
- load data into the MongoDB local server

In [1]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

In [2]:
host_name = "localhost"
ports = {"mongo" : 27017, "mysql" : 3306}

user_id = "root"
pwd = "ViolinOkTree5678!"

src_dbname = "sakila_payment"
dst_dbname = "sakila_dw"

In [3]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [4]:
port = ports["mongo"]
conn_str = f"mongodb://{host_name}:{port}/"
client = pymongo.MongoClient(conn_str)
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"customer" : 'customer_payment.json',
              "staff" : 'staff_payment.json',
              "rental" : 'rental_payment.json',
              "payment" : 'payment_updated.json'
             }

for file in json_files:
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()        

### Create and Populate Dimension Tables

### Customer
- Extract data from Source MongoDB Collection
- Transform the data by removing unnecessary columns and renaming others

In [5]:
# EXTRACT

query = {}
port = ports["mongo"]
collection = "customer"

df_customers_payment = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_customers_payment.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [6]:
# TRANSFORM data

drop_cols = ['store_id', 'address_id', 'last_update']
df_customers_payment.drop(drop_cols, axis=1, inplace=True)

# rename column
df_customers_payment.rename(columns={"customer_id":"customer_key"}, inplace=True)

df_customers_payment.head(2)

Unnamed: 0,customer_key,first_name,last_name,email,active,create_date
0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36
1,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,2006-02-14 22:04:36


### Staff
- Extract data from Source MongoDB Collection
- Transform the data by removing unnecessary columns and renaming others

In [7]:
# EXTRACT

query = {}
port = ports["mongo"]
collection = "staff"

df_staff_payment = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_staff_payment.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,email,store_id,active,username,password
0,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964
1,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,


In [8]:
# TRANSFORM

# drop columns
drop_cols = ['address_id', 'username', 'password']
df_staff_payment.drop(drop_cols, axis=1, inplace=True)

# rename column
df_staff_payment.rename(columns={"staff_id":"staff_key", "store_id":"store_key"}, inplace=True)

df_staff_payment.head(2)

Unnamed: 0,staff_key,first_name,last_name,email,store_key,active
0,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,1
1,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,2,1


### Rentals
- Extract data from Source MongoDB Collection
- Transform the data by removing unnecessary columns and renaming others

In [9]:
# EXTRACT

query = {}
port = ports["mongo"]
collection = "rental"

df_rental_payment = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_rental_payment.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [11]:
# TRANSFORM

# drop columns
drop_cols = ['last_update']
df_rental_payment.drop(drop_cols, axis=1, inplace=True)

# rename column
df_rental_payment.rename(columns={"rental_id":"rental_key"}, inplace=True)

df_rental_payment.head(2)

Unnamed: 0,rental_key,rental_date,inventory_id,customer_id,return_date,staff_id
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1


#### Load and Validate transformed dataframes 
- load into the new data warehouse calling the set_dataframe function
- validate that the new dimension tables were created with SQL query

In [12]:
# Load customers
dataframe = df_customers_payment
table_name = 'dim_customers_payment'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [15]:
# validate customers
sql_customers = "SELECT * FROM sakila_dw.dim_customers_payment;"
df_dim_customers_payment = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_customers)
df_dim_customers_payment.head(2)

Unnamed: 0,customer_key,first_name,last_name,email,active,create_date
0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36
1,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,2006-02-14 22:04:36


In [13]:
# Load Staff
dataframe = df_staff_payment
table_name = 'dim_staff_payment'
primary_key = 'staff_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [16]:
# validate staff
sql_staff = "SELECT * FROM sakila_dw.dim_staff_payment;"
df_dim_staff_payment = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_staff)
df_dim_staff_payment.head(2)

Unnamed: 0,staff_key,first_name,last_name,email,store_key,active
0,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,1
1,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,2,1


In [14]:
# Load Rentals
dataframe = df_rental_payment
table_name = 'dim_rental_payment'
primary_key = 'rental_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [17]:
# validate rentals
sql_rental = "SELECT * FROM sakila_dw.dim_rental_payment;"
df_dim_rental_payment = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_rental)
df_dim_rental_payment.head(2)

Unnamed: 0,rental_key,rental_date,inventory_id,customer_id,return_date,staff_id
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1


### Create and Populate the Payments Fact Table
- Extract data from Source MongoDB Collection
- Transform the data by removing unnecessary columns and renaming others
- Load the dataframe into the data warehouse

In [18]:
# EXTRACT

query = {} # Select all elements (columns), and all documents (rows).

port = ports["mongo"]
collection = "payment"

df_payment_transactions = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_payment_transactions.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


In [19]:
# TRANSFORM

column_name_map = {"payment_id" : "payment_key",
                   "customer_id" : "customer_key",
                   "staff_id" : "staff_key",
                   "rental_id" : "rental_key"
                  }

df_payment_transactions.rename(columns=column_name_map, inplace=True)
df_payment_transactions.insert(0, "fact_payment_transactions_key", range(1, df_payment_transactions.shape[0]+1))
df_payment_transactions.head(2)

Unnamed: 0,fact_payment_transactions_key,payment_key,customer_key,staff_key,rental_key,amount,payment_date,last_update
0,1,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30
2,3,3,1,1,1185,5.99,2005-06-15 00:54:12,2006-02-15 22:12:30
3,4,4,1,2,1422,0.99,2005-06-15 18:02:53,2006-02-15 22:12:30
4,5,5,1,2,1476,9.99,2005-06-15 21:08:46,2006-02-15 22:12:30
...,...,...,...,...,...,...,...,...
16039,16040,16045,599,1,14599,4.99,2005-08-21 17:43:42,2006-02-15 22:24:12
16040,16041,16046,599,1,14719,1.99,2005-08-21 21:41:57,2006-02-15 22:24:12
16041,16042,16047,599,2,15590,8.99,2005-08-23 06:09:44,2006-02-15 22:24:12
16042,16043,16048,599,2,15719,2.99,2005-08-23 11:08:46,2006-02-15 22:24:13


In [20]:
dataframe = df_payment_transactions
table_name = 'fact_payment_transactions'
primary_key = 'fact_payment_transactions_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validate that the Fact Table was create with SQL queries

In [21]:
# verify that values inserted properly into the table
sql_payment_transactions = "SELECT * FROM sakila_dw.fact_payment_transactions;"
df_payment_transactions = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_payment_transactions)
df_payment_transactions.head(2)

Unnamed: 0,fact_payment_transactions_key,payment_key,customer_key,staff_key,rental_key,amount,payment_date,last_update
0,1,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


In [22]:
# verify that number of rows in table is correct
sql_payment_transactions_count = "SELECT COUNT(*) FROM sakila_dw.fact_payment_transactions;"
df_count = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_payment_transactions_count)
df_count.head()

Unnamed: 0,COUNT(*)
0,16044
