## Midterm Project - Madison Dietl
I drew from the Sakila database to create my data warehouse, and will be modeling the business process of movie rentals. In my notebook, I start by declaring my connection variables and defining my data getters and setters. I then create my data warehouse and import data from 3 sources (MYSQL, csv file, and MongoDB with a json file) to create dimension tables, and then transform them. Next, I create a fact table, merge important data to it, and make transformations to create foreign keys and organize the table. Finally, I load all my new dimension tables into my data warehouse. I tested it with two SELECT statements that draw from two of the dimension tables (customer & staff) and the fact table. Thanks for reading!
#### Import necessary libraries

In [1]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine
import json
import datetime
import pymongo

#### Declare and assign connection variables

In [2]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_dw2"

mysql_uid = "test"
mysql_pwd = "Peachesrule1"
mysql_host= "ds2002-mysql.mysql.database.azure.com"

conn_str = {"local" : f"mongodb://localhost:27017/"}

print(f"Local Connection String: {conn_str['local']}")

Local Connection String: mongodb://localhost:27017/


#### Define functions for getting data from and setting data into databases

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe

def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Create new data warehouse

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x168a9dcf700>

#### Import data from MYSQL

In [5]:
sql_inventory = """ 
SELECT i.inventory_id,
i.last_update,
f.rental_rate,
f.film_id AS film_id
FROM sakila.inventory AS i
INNER JOIN sakila.film AS f
ON i.film_id = f.film_id;
"""

df_inventory = get_dataframe(user_id, pwd, host_name, src_dbname, sql_inventory)
df_inventory.head(2)

Unnamed: 0,inventory_id,last_update,rental_rate,film_id
0,1,2006-02-15 05:09:17,0.99,1
1,2,2006-02-15 05:09:17,0.99,1


In [6]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


#### Read in CSV files with Sakila data

In [7]:
data_dir = os.path.join(os.getcwd(), 'sakilacsv')
data_file = os.path.join(data_dir, 'sakila.staff.csv')

df_staff = pd.read_csv(data_file, header=0)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


#### Populate MongoDB with source data

In [8]:
client = pymongo.MongoClient("mongodb://localhost:27017/")
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'sakilajson')

json_files = {"customer" : 'sakila.customer.json'}

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close() 

#### Extract data from Mongo

In [9]:
query = {}
collection = "customer"

df_customer = get_mongo_dataframe("mongodb://localhost:27017/", src_dbname, collection, query)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


#### Make necessary transformations to dimension tables

In [10]:
drop_columns = ['payment_id','customer_id','staff_id','last_update']
df_payment.drop(drop_columns, axis=1, inplace=True)

df_payment.head(2)

Unnamed: 0,rental_id,amount,payment_date
0,76,2.99,2005-05-25 11:30:37
1,573,0.99,2005-05-28 10:35:23


In [11]:
df_inventory.rename(columns={"inventory_id":"inventory_key"}, inplace=True)

df_inventory.head(2)

Unnamed: 0,inventory_key,last_update,rental_rate,film_id
0,1,2006-02-15 05:09:17,0.99,1
1,2,2006-02-15 05:09:17,0.99,1


In [12]:
df_staff.rename(columns={"staff_id":"staff_key"}, inplace=True)

df_staff.head(2)

Unnamed: 0,staff_key,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


In [13]:
df_customer.rename(columns={"customer_id":"customer_key"}, inplace=True)

df_customer.head(2)

Unnamed: 0,customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


#### Create and populate fact table

In [None]:
sql_rental = "SELECT * FROM sakila.rental;"
df_fact_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)

df_fact_rental = pd.merge(df_fact_rental, df_payment, on='rental_id', how='inner')

df_fact_rental.head(2)

#### Perform additional transformations to fact table

In [None]:
df_fact_rental.rename(columns={"rental_id":"fact_rental_key", "inventory_id":"inventory_key", "customer_id":"customer_key"
                               , "staff_id":"staff_key"}, inplace=True)

ordered_columns = ['fact_rental_key','inventory_key','customer_key','staff_key','rental_date','return_date', 'amount',
                   'last_update']
df_fact_rental = df_fact_rental[ordered_columns]

df_fact_rental.head(2)

#### Get the data from the date dimension table

In [None]:
df_fact_rental['rental_date'] = pd.to_datetime(df_fact_rental['rental_date']).dt.date
df_fact_rental['return_date'] = pd.to_datetime(df_fact_rental['return_date']).dt.date

In [None]:
sql_dim_date = "SELECT date_key, full_date FROM sakila.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = pd.to_datetime(df_dim_date['full_date']).dt.date
df_dim_date.head(2)

#### Lookup the DateKeys from the Date Dim table

In [None]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rental = pd.merge(df_fact_rental, df_dim_rental_date, on='rental_date', how='inner')
df_fact_rental.drop(['rental_date'], axis=1, inplace=True)
df_fact_rental.head(2)

In [None]:
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rental = pd.merge(df_fact_rental, df_dim_return_date, on='return_date', how='inner')
df_fact_rental.drop(['return_date'], axis=1, inplace=True)
df_fact_rental.head(2)

#### Write dataframe back to database

In [None]:
table_name = "fact_rental"
primary_key = "fact_rental_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_rental, table_name, primary_key, db_operation)

#### Load the transformed data frames into the new data warehouse

In [None]:
db_operation = "insert"

tables = [('dim_inventory', df_inventory, 'inventory_key'),
         ('dim_staff', df_staff, 'staff_key'),
          ('dim_customer', df_customer, 'customer_key'),
         ('dim_date', df_dim_date, 'date_key')]

In [None]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### Testing the data warehouse

#### This test returns the last names of customers who have spent the most money on movie rentals in descending order

In [None]:
sql_test = """
    SELECT customer.`last_name` as `customer_name`,
        SUM(rental.`amount`) AS `total_amount_spent`
    FROM `{0}`.`fact_rental` AS rental
    INNER JOIN `{0}`.dim_customer AS customer
    ON rental.customer_key = customer.customer_key
    GROUP BY customer.`last_name`
    ORDER BY total_amount_spent DESC;
    """.format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)

In [None]:
df_test.head()

#### This test returns the total sales of the two staff members, Jon and Mike

In [None]:
sql_test2 = """
    SELECT staff.`first_name` as `employee_name`,
        SUM(rental.`amount`) AS `total_sales`
    FROM `{0}`.`fact_rental` AS rental
    INNER JOIN `{0}`.dim_staff AS staff
    ON rental.staff_key = staff.staff_key
    GROUP BY staff.`first_name`
    ORDER BY total_sales DESC;
    """.format(dst_dbname)

df_test2 = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test2)

In [None]:
df_test2.head()