# Midterm Project

## 1.0 Jupyster Notebook Set Up 

#### 1.1 Import the Necessary Libraries

In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

#### 1.2 Define global variables for data file paths, database connection credentials, etc.

In [2]:
mysql_args = {
    "user_id" : "root",
    "pwd" : "Passw0rd123",
    "host_name" : "localhost",
    "db_name" : "sakila_dw"
}

mongodb_args = {
    "user_name" : "qqz6vz",
    "password" : "Br00ks129",
    "cluster_name" : "mycluster",
    "cluster_subnet" : "rulta",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila"
}

#### 1.3 Define global functions for reading and writing to MySql, reading from MongoDB, etc.

In [3]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['user_id']}:{args['pwd']}@{args['host_name']}/{args['db_name']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['user_id']}:{args['pwd']}@{args['host_name']}/{args['db_name']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### 1.4 Populate MongoDB with Source Data 

In [4]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd())

json_files = {"customer" : 'sakila_customer.json'}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)         

## 2.0 Create and Populate the New Dimension Tables

### 2.1 Extract Data from Sources into Pandas DataFrames and Make Any Necessary Transformations

#### 2.1.1 CSV File

In [5]:
# Load data from CSV file
csv_file_path = '/Users/kaitlynbrooks/Downloads/DS-2002-main/Projects/Midterm Project/sakila_inventory.csv'
df_inventory = pd.read_csv(csv_file_path)

# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_inventory.insert(0, "inventory_key", range(1, df_inventory.shape[0]+1))

# Display the first 2 rows of the dataframe to validate your work
df_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id,last_update
0,1,1,1,1,2006-02-15 05:09:17
1,2,2,1,1,2006-02-15 05:09:17


#### 2.1.2 MongoDB Collections 

In [6]:
# Load data from MongoDB Collections
client = get_mongo_client(**mongodb_args)
query = {} # Select all elements (columns), and all documents (rows).
collection = "customer"
df_customer = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)

# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_customer.insert(0, "customer_key", range(1, df_customer.shape[0]+1))

# Display the first 2 rows of the dataframe to validate your work
df_customer.head(2)

Unnamed: 0,customer_key,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


#### 2.1.3 SQL Database

In [7]:
# Load data from SQL database
sql_staff = "SELECT * FROM sakila.staff;"
df_staff = get_sql_dataframe(sql_staff, **mysql_args)

df_staff.drop(['picture', 'password'], axis=1, inplace=True)

# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_staff.insert(0, "staff_key", range(1, df_staff.shape[0]+1))

# Display the first 2 rows of the dataframe to validate your work
df_staff.head(2)

Unnamed: 0,staff_key,staff_id,first_name,last_name,address_id,email,store_id,active,username,last_update
0,1,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,2006-02-15 03:57:16
1,2,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,2006-02-15 03:57:16


### 2.2 Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables 

In [8]:
db_operation = "insert"

tables = [('dim_customer', df_customer, 'customer_key'),
          ('dim_inventory', df_inventory, 'inventory_key'),
          ('dim_staff', df_staff, 'staff_key'),]

for table_name, dataframe, primary_key in tables:
    set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

## 3.0 Create & Populate the Fact Table 

### 3.1 Read Data from the Transaction Table in your Source Database

In [9]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_sql_dataframe(sql_rental, **mysql_args)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [12]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_sql_dataframe(sql_payment, **mysql_args)
df_payment.drop(['staff_id','customer_id','last_update'], axis=1, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_id,rental_id,amount,payment_date
0,1,76,2.99,2005-05-25 11:30:37
1,2,573,0.99,2005-05-28 10:35:23


In [13]:
df_fact_rentals = pd.merge(df_rental, df_payment, on='rental_id', how='outer')
df_fact_rentals.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update,payment_id,amount,payment_date
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3504,2.99,2005-05-24 22:53:30
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53,12377,2.99,2005-05-24 22:54:33


### 3.2 Lookup the Primary Keys from the Dimension Tables 

In [14]:
sql_dim_customer = "SELECT customer_key, customer_id FROM sakila_dw.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_dim_customer, **mysql_args)
df_dim_customer.head(2)

Unnamed: 0,customer_key,customer_id
0,1,1
1,2,2


In [15]:
sql_dim_inventory = "SELECT inventory_key, inventory_id FROM sakila_dw.dim_inventory;"
df_dim_inventory = get_sql_dataframe(sql_dim_inventory, **mysql_args)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id
0,1,1
1,2,2


In [16]:
sql_dim_staff = "SELECT staff_key, staff_id FROM sakila_dw.dim_staff;"
df_dim_staff = get_sql_dataframe(sql_dim_staff, **mysql_args)
df_dim_staff.head(2)

Unnamed: 0,staff_key,staff_id
0,1,1
1,2,2


### 3.3 Use the Business Keys to lookup Corresponding Surrogate Primary Keys in the Dimension tables 

In [18]:
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_customer, on='customer_id', how='left')
df_fact_rentals.drop(['customer_id'], axis=1, inplace=True)

df_fact_rentals = pd.merge(df_fact_rentals, df_dim_inventory, on='inventory_id', how='left')
df_fact_rentals.drop(['inventory_id'], axis=1, inplace=True)

df_fact_rentals = pd.merge(df_fact_rentals, df_dim_staff, on='staff_id', how='left')
df_fact_rentals.drop(['staff_id'], axis=1, inplace=True)

df_fact_rentals.insert(0, "fact_rentals_key", range(1,df_fact_rentals.shape[0]+1))

df_fact_rentals.head(2)

Unnamed: 0,fact_rentals_key,rental_id,rental_date,return_date,last_update,payment_id,amount,payment_date,customer_key,inventory_key,staff_key
0,1,1,2005-05-24 22:53:30,2005-05-26 22:04:30,2006-02-15 21:30:53,3504,2.99,2005-05-24 22:53:30,130,367.0,1
1,2,2,2005-05-24 22:54:33,2005-05-28 19:40:33,2006-02-15 21:30:53,12377,2.99,2005-05-24 22:54:33,459,,1


### 3.4 Lookup the DateKeys from the Date Dimension Table

In [20]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [22]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rentals.rental_date = df_fact_rentals.rental_date.astype('datetime64[ns]').dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_rental_date, on='rental_date', how='left')
df_fact_rentals.drop(['rental_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,fact_rentals_key,rental_id,return_date,last_update,payment_id,amount,payment_date,customer_key,inventory_key,staff_key,rental_date_key
0,1,1,2005-05-26 22:04:30,2006-02-15 21:30:53,3504,2.99,2005-05-24 22:53:30,130,367.0,1,20050524
1,2,2,2005-05-28 19:40:33,2006-02-15 21:30:53,12377,2.99,2005-05-24 22:54:33,459,,1,20050524


In [23]:
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rentals.return_date = df_fact_rentals.return_date.astype('datetime64[ns]').dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_return_date, on='return_date', how='left')
df_fact_rentals.drop(['return_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,fact_rentals_key,rental_id,last_update,payment_id,amount,payment_date,customer_key,inventory_key,staff_key,rental_date_key,return_date_key
0,1,1,2006-02-15 21:30:53,3504,2.99,2005-05-24 22:53:30,130,367.0,1,20050524,20050526.0
1,2,2,2006-02-15 21:30:53,12377,2.99,2005-05-24 22:54:33,459,,1,20050524,20050528.0


In [24]:
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_fact_rentals.last_update = df_fact_rentals.last_update.astype('datetime64[ns]').dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_last_update, on='last_update', how='left')
df_fact_rentals.drop(['last_update'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,fact_rentals_key,rental_id,payment_id,amount,payment_date,customer_key,inventory_key,staff_key,rental_date_key,return_date_key,last_update_key
0,1,1,3504,2.99,2005-05-24 22:53:30,130,367.0,1,20050524,20050526.0,20060215
1,2,2,12377,2.99,2005-05-24 22:54:33,459,,1,20050524,20050528.0,20060215


In [25]:
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_rentals.payment_date = df_fact_rentals.payment_date.astype('datetime64[ns]').dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_payment_date, on='payment_date', how='left')
df_fact_rentals.drop(['payment_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,fact_rentals_key,rental_id,payment_id,amount,customer_key,inventory_key,staff_key,rental_date_key,return_date_key,last_update_key,payment_date_key
0,1,1,3504,2.99,130,367.0,1,20050524,20050526.0,20060215,20050524
1,2,2,12377,2.99,459,,1,20050524,20050528.0,20060215,20050524


### 3.6 Load the Newly Transformed MongoDB Data into the Sakila_DW Data Warehouse 

In [28]:
dataframe = df_fact_rentals
table_name = 'fact_rentals'
primary_key = 'fact_rentals_key'
db_operation = 'insert'

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

### 3.7 Validate that the New Fact Table was Created 

In [29]:
sql_fact_rentals = "SELECT * FROM sakila_dw.fact_rentals;"
df_fact_rentals = get_sql_dataframe(sql_fact_rentals, **mysql_args)
df_fact_rentals.head(2)

Unnamed: 0,fact_rentals_key,rental_id,payment_id,amount,customer_key,inventory_key,staff_key,rental_date_key,return_date_key,last_update_key,payment_date_key
0,1,1,3504,2.99,130,367.0,1,20050524,20050526.0,20060215,20050524
1,2,2,12377,2.99,459,,1,20050524,20050528.0,20060215,20050524


## 4.0 Demonstrate that the New Data Warehouse Exists and Contains the Correct Data 

To demonstrate the viability of my solution, I will author a SQL SELECT statement that returns:
- Each Customer's Last Name
- The total number of rentals associated with each Customer
- The total amount payment amount associated with each Customer

In [33]:
sql_rentals = """
    SELECT customers.`last_name` AS `customer_name`,
        COUNT(rentals.`rental_id`) AS `total_rentals`,
        SUM(rentals.`amount`) AS `total_payment`
    FROM `sakila_dw`.`fact_rentals` as rentals
    INNER JOIN `sakila_dw`.`dim_customer` AS customers
    ON rentals.customer_key = customers.customer_key
    GROUP BY customers.`last_name`
    ORDER BY total_payment DESC;
"""

df_rentals = get_sql_dataframe(sql_rentals, **mysql_args)
df_rentals.head()

Unnamed: 0,customer_name,total_rentals,total_payment
0,SEAL,45,221.55
1,HUNT,46,216.54
2,SHAW,42,195.58
3,KENNEDY,39,194.61
4,SNYDER,39,194.61
