## Midterm Project - Sakila database 

#### Import the Necessary Libraries

In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 1.4.39
Running PyMongo Version: 4.6.2


#### Declare & Assign Connection Variables for the MySQL Server & Databases with which You'll be Working 

In [3]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_dw2"

#### Define Functions for Getting Data From and Setting Data Into Databases

In [4]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context.
Create the New Data Warehouse database, and to Use it, Switch the Connection Context.
Clearly, you won't get very far without having a database to work with. Here we demonstrate how we can drop a database if it already exists, and then create the new sakila_dw2 database and use it as the target of all subsequent operations.

In [5]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x1d1f27df650>

### 1.0. Create & Populate the Dimension Tables
In any extract-transform-load (ETL) process used to populate a multi-dimensional data warehouse database it is necessary to populate the Dimension tables before attempting to populate the Fact table(s). This is because rows in the Fact table(s) will reference surrogate primary key values from the Dimension tables. If the primary key values in the Dimension tables either do not exist, or do not reflect the current state of the dimension, then the attempt to load the Fact table(s) will fail.

#### 1.1. Extract Data from the Source Database Tables
Extracting data from relevant tables relating to rentals: customer, film, inventory, payment, rental, store, and address.

In [6]:
# customer
sql_customer = "SELECT * FROM sakila.customer;"
df_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-14 23:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-14 23:57:20


In [7]:
# film
sql_film = "SELECT * FROM sakila.film;"
df_film = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42


In [8]:
# inventory 
sql_inventory = "SELECT * FROM sakila.inventory;"
df_inventory = get_dataframe(user_id, pwd, host_name, src_dbname, sql_inventory)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 00:09:17
1,2,1,1,2006-02-15 00:09:17


In [9]:
# payment 
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 17:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 17:12:30


In [10]:
# rental
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 16:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 16:30:53


In [11]:
# store 
sql_store = "SELECT * FROM sakila.store;"
df_store = get_dataframe(user_id, pwd, host_name, src_dbname, sql_store)
df_store.head(2)

Unnamed: 0,store_id,manager_staff_id,address_id,last_update
0,1,1,1,2006-02-14 23:57:12
1,2,2,2,2006-02-14 23:57:12


In [12]:
# address 
sql_address = "SELECT * FROM sakila.address;"
df_address = get_dataframe(user_id, pwd, host_name, src_dbname, sql_address)
df_address.head(2)

Unnamed: 0,address_id,address,address2,district,city_id,postal_code,phone,location,last_update
0,1,47 MySakila Drive,,Alberta,300,,,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00>\n2]c4\...,2014-09-25 18:30:27
1,2,28 MySQL Boulevard,,QLD,576,,,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\x8e\x10...,2014-09-25 18:30:09


#### 1.3. Perform Any Necessary Transformations

In [13]:
# customer
# 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
drop_cols = ['create_date','last_update']
df_customer.drop(drop_cols, axis=1, inplace=True)

# 2. Rename the "id" column to reflect the entity as it will serve as the business key for lookup operations
df_customer.rename(columns={"id":"customer_id"}, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_customer.insert(0, "customer_key", range(1, df_customer.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate your work
df_customer.head(2)

Unnamed: 0,customer_key,customer_id,store_id,first_name,last_name,email,address_id,active
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1


In [14]:
# film 
drop_cols = ['description','original_language_id', 'special_features','last_update']
df_film.drop(drop_cols, axis=1, inplace=True)

df_film.rename(columns={"id":"film_id"}, inplace=True)

df_film.insert(0, "film_key", range(1, df_film.shape[0]+1))

df_film.head(2)

Unnamed: 0,film_key,film_id,title,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating
0,1,1,ACADEMY DINOSAUR,2006,1,6,0.99,86,20.99,PG
1,2,2,ACE GOLDFINGER,2006,1,3,4.99,48,12.99,G


In [15]:
# inventory 
drop_cols = ['last_update']
df_inventory.drop(drop_cols, axis=1, inplace=True)

df_inventory.rename(columns={"id":"inventory_id"}, inplace=True)

df_inventory.insert(0, "inventory_key", range(1, df_inventory.shape[0]+1))

df_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id
0,1,1,1,1
1,2,2,1,1


In [16]:
# payment 
drop_cols = ['last_update','staff_id']
df_payment.drop(drop_cols, axis=1, inplace=True)

df_payment.rename(columns={"id":"payment_id"}, inplace=True)

df_payment.insert(0, "payment_key", range(1, df_payment.shape[0]+1))

df_payment.head(2)

Unnamed: 0,payment_key,payment_id,customer_id,rental_id,amount,payment_date
0,1,1,1,76,2.99,2005-05-25 11:30:37
1,2,2,1,573,0.99,2005-05-28 10:35:23


In [17]:
# rental
drop_cols = ['last_update', 'staff_id']
df_rental.drop(drop_cols, axis=1, inplace=True)

df_rental.rename(columns={"id":"rental_id"}, inplace=True)

df_rental.insert(0, "rental_key", range(1, df_rental.shape[0]+1))

df_rental.head(2)

Unnamed: 0,rental_key,rental_id,rental_date,inventory_id,customer_id,return_date
0,1,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30
1,2,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33


In [18]:
drop_cols = ['last_update', 'manager_staff_id']
df_store.drop(drop_cols, axis=1, inplace=True)

df_store.rename(columns={"id":"store_id"}, inplace=True)

df_store.insert(0, "store_key", range(1, df_store.shape[0]+1))

df_store.head(2)

Unnamed: 0,store_key,store_id,address_id
0,1,1,1
1,2,2,2


In [19]:
# address 
drop_cols = ['address2', 'city_id', 'postal_code', 'phone', 'location', 'last_update']
df_address.drop(drop_cols, axis=1, inplace=True)

df_address.rename(columns={"id":"address_id"}, inplace=True)

df_address.insert(0, "address_key", range(1, df_address.shape[0]+1))

df_address.head(2)

Unnamed: 0,address_key,address_id,address,district
0,1,1,47 MySakila Drive,Alberta
1,2,2,28 MySQL Boulevard,QLD


#### 1.4. Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables
Here we demonstrate how an iterable data structure can be created containing the values needed to correctly create and populate the new dimension tables. If you inspect this code listing carefully, you'll notice that it's a **list** containing a **set** *(or vector)* for each dimension table. Each **set** then contains the *table_name* we need to assign to the table, the *pandas DataFrame* we crafted to define & populate the table, and the name we need to assign to the *primary_key* column.  With this *list of sets* defined, we can then call our **set_dataframe( )** function from within a **for *loop*** to create each *dimension* table.

In [20]:
db_operation = "insert"

tables = [('dim_customer', df_customer, 'customer_key'),
          ('dim_film', df_film, 'film_key'),
          ('dim_inventory', df_inventory, 'inventory_key'),
          ('dim_payment', df_payment, 'payment_key'),
         ('dim_rental', df_rental, 'rental_key'),
         ('dim_store', df_store, 'store_key'),
         ('dim_address', df_address, 'address_key')]

In [21]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### 2.0. Create & Populate the Fact Table
Here we will learn two approaches to creating the fact_rental fact table. The first approach demonstrates that a carefully crafted SQL SELECT statement can be used to perform this task... but what fun would that be. Seriously though, this approach is quick and effect if you already have the query, but what if you didn't have the opportunity to view and work with the data beforehand? What's more, you may be required to combine data from multiple sources, some of which may not be relational database management systems. Then, a simple SQL query won't do! You would need to load the data from the various sources (e.g., database tables, CSV or JSON files, NoSQL document collections, API stream data) and then combine them into a single dataframe that you could then use to create a new database table. For this reason we'll see how we can retrieve the data, but we won't bother to use it for creating a new table... we already know how to do that using the set_dataframe( ) function anyway.

### Implement the solution using Pandas DataFrames to craft the table

#### 2.1. Get all the data from each of the tables involved

In [22]:
# merge store and address 
df_fact_rental = pd.merge(df_store, df_address, on= "address_id", how = "inner")
df_fact_rental.drop(['address_id'], axis = 1, inplace = True)
df_fact_rental.head(2)

Unnamed: 0,store_key,store_id,address_key,address,district
0,1,1,1,47 MySakila Drive,Alberta
1,2,2,2,28 MySQL Boulevard,QLD


In [23]:
# merge fact_rental and customer 
df_fact_rental = pd.merge(df_fact_rental, df_customer, on= "store_id", how = "inner")
df_fact_rental.drop(['address_id'], axis = 1, inplace = True)
df_fact_rental.drop(['store_id'], axis = 1, inplace = True)
df_fact_rental.head(2)

Unnamed: 0,store_key,address_key,address,district,customer_key,customer_id,first_name,last_name,email,active
0,1,1,47 MySakila Drive,Alberta,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1
1,1,1,47 MySakila Drive,Alberta,2,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1


In [24]:
# merge fact_rental and payment 
df_fact_rental = pd.merge(df_fact_rental, df_payment, on= "customer_id", how = "inner")
df_fact_rental.drop(['customer_id'], axis = 1, inplace = True)
df_fact_rental.head(2)

Unnamed: 0,store_key,address_key,address,district,customer_key,first_name,last_name,email,active,payment_key,payment_id,rental_id,amount,payment_date
0,1,1,47 MySakila Drive,Alberta,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1,1,76,2.99,2005-05-25 11:30:37
1,1,1,47 MySakila Drive,Alberta,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2,2,573,0.99,2005-05-28 10:35:23


In [25]:
# merge fact_rental and rental 
df_fact_rental = pd.merge(df_fact_rental, df_rental, on= "rental_id", how = "inner")
df_fact_rental.drop(['payment_id'], axis = 1, inplace = True)
df_fact_rental.head(2)

Unnamed: 0,store_key,address_key,address,district,customer_key,first_name,last_name,email,active,payment_key,rental_id,amount,payment_date,rental_key,rental_date,inventory_id,customer_id,return_date
0,1,1,47 MySakila Drive,Alberta,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1,76,2.99,2005-05-25 11:30:37,76,2005-05-25 11:30:37,3021,1,2005-06-03 12:00:37
1,1,1,47 MySakila Drive,Alberta,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2,573,0.99,2005-05-28 10:35:23,572,2005-05-28 10:35:23,4020,1,2005-06-03 06:32:23


In [26]:
# merge fact_rental and inventory 
df_fact_rental = pd.merge(df_fact_rental, df_inventory, on= "inventory_id", how = "inner")
df_fact_rental.drop(['rental_id'], axis = 1, inplace = True)
df_fact_rental.head(2)

Unnamed: 0,store_key,address_key,address,district,customer_key,first_name,last_name,email,active,payment_key,amount,payment_date,rental_key,rental_date,inventory_id,customer_id,return_date,inventory_key,film_id,store_id
0,1,1,47 MySakila Drive,Alberta,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1,2.99,2005-05-25 11:30:37,76,2005-05-25 11:30:37,3021,1,2005-06-03 12:00:37,3021,663,2
1,1,1,47 MySakila Drive,Alberta,383,MARTIN,BALES,MARTIN.BALES@sakilacustomer.org,1,10384,0.99,2005-08-02 08:49:09,11118,2005-08-02 08:49:09,3021,383,2005-08-08 04:33:09,3021,663,2


In [27]:
# merge fact_rental and film
df_fact_rental = pd.merge(df_fact_rental, df_film, on= "film_id", how = "inner")
df_fact_rental.drop(['customer_id'], axis = 1, inplace = True)
df_fact_rental.drop(['store_id'], axis = 1, inplace = True)
df_fact_rental.drop(['inventory_id'], axis = 1, inplace = True)
df_fact_rental.head(2)

Unnamed: 0,store_key,address_key,address,district,customer_key,first_name,last_name,email,active,payment_key,...,film_id,film_key,title,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating
0,1,1,47 MySakila Drive,Alberta,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1,...,663,663,PATIENT SISTER,2006,1,7,0.99,99,29.99,NC-17
1,1,1,47 MySakila Drive,Alberta,383,MARTIN,BALES,MARTIN.BALES@sakilacustomer.org,1,10384,...,663,663,PATIENT SISTER,2006,1,7,0.99,99,29.99,NC-17


In [28]:
df_fact_rental.drop(['film_id'], axis = 1, inplace = True)

In [29]:
df_fact_rental.head(2) # verifying fact table 

Unnamed: 0,store_key,address_key,address,district,customer_key,first_name,last_name,email,active,payment_key,...,inventory_key,film_key,title,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating
0,1,1,47 MySakila Drive,Alberta,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1,...,3021,663,PATIENT SISTER,2006,1,7,0.99,99,29.99,NC-17
1,1,1,47 MySakila Drive,Alberta,383,MARTIN,BALES,MARTIN.BALES@sakilacustomer.org,1,10384,...,3021,663,PATIENT SISTER,2006,1,7,0.99,99,29.99,NC-17


##### 2.5.3. Lookup the DateKeys from the Date Dimension Table.
First, fetch the Surrogate Primary Key (date_key) and the Business Key (full_date) from the Date Dimension table using the **get_dataframe()** function. Be certain to cast the **full_date** column to the **datetime64[ns]** data type using the **.astype()** function that is native to Pandas DataFrame columns. Also, extract the **date** portion using the **.dt.date** attribute of the **datetime64[ns]** datatype.

In [30]:
sql_dim_date = "SELECT date_key, full_date FROM sakila.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64')
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


Next, for each **date** typed column in the Fact table, lookup the corresponding Primary Key column. Be certain to cast each **date** column to the **datetime64[ns]** data type using the **.astype()** function that's native to Pandas DataFrame columns. Also, extract the **date** portion using the **.dt.date** attribute.

In [31]:
# rental date 
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rental = pd.merge(df_fact_rental, df_dim_rental_date, on='rental_date', how='left')
df_fact_rental.drop(['rental_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,store_key,address_key,address,district,customer_key,first_name,last_name,email,active,payment_key,...,film_key,title,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,rental_date_key
0,1,1,47 MySakila Drive,Alberta,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1,...,663,PATIENT SISTER,2006,1,7,0.99,99,29.99,NC-17,
1,1,1,47 MySakila Drive,Alberta,383,MARTIN,BALES,MARTIN.BALES@sakilacustomer.org,1,10384,...,663,PATIENT SISTER,2006,1,7,0.99,99,29.99,NC-17,


In [32]:
# return date 
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rental = pd.merge(df_fact_rental, df_dim_return_date, on='return_date', how='left')
df_fact_rental.drop(['return_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,store_key,address_key,address,district,customer_key,first_name,last_name,email,active,payment_key,...,title,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,rental_date_key,return_date_key
0,1,1,47 MySakila Drive,Alberta,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1,...,PATIENT SISTER,2006,1,7,0.99,99,29.99,NC-17,,
1,1,1,47 MySakila Drive,Alberta,383,MARTIN,BALES,MARTIN.BALES@sakilacustomer.org,1,10384,...,PATIENT SISTER,2006,1,7,0.99,99,29.99,NC-17,,


In [33]:
# payment date 
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_rental = pd.merge(df_fact_rental, df_dim_payment_date, on='payment_date', how='left')
df_fact_rental.drop(['payment_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,store_key,address_key,address,district,customer_key,first_name,last_name,email,active,payment_key,...,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,rental_date_key,return_date_key,payment_date_key
0,1,1,47 MySakila Drive,Alberta,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1,...,2006,1,7,0.99,99,29.99,NC-17,,,
1,1,1,47 MySakila Drive,Alberta,383,MARTIN,BALES,MARTIN.BALES@sakilacustomer.org,1,10384,...,2006,1,7,0.99,99,29.99,NC-17,,,


#### 2.7. Write the DataFrame Back to the Database

In [34]:
table_name = "fact_rental"
primary_key = "rental_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_rental, table_name, primary_key, db_operation)
df_fact_rental.head()

Unnamed: 0,store_key,address_key,address,district,customer_key,first_name,last_name,email,active,payment_key,...,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,rental_date_key,return_date_key,payment_date_key
0,1,1,47 MySakila Drive,Alberta,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1,...,2006,1,7,0.99,99,29.99,NC-17,,,
1,1,1,47 MySakila Drive,Alberta,383,MARTIN,BALES,MARTIN.BALES@sakilacustomer.org,1,10384,...,2006,1,7,0.99,99,29.99,NC-17,,,
2,1,1,47 MySakila Drive,Alberta,547,KURT,EMMONS,KURT.EMMONS@sakilacustomer.org,1,14700,...,2006,1,7,0.99,99,29.99,NC-17,,,
3,2,2,28 MySQL Boulevard,QLD,210,ELLA,OLIVER,ELLA.OLIVER@sakilacustomer.org,1,5694,...,2006,1,7,0.99,99,29.99,NC-17,,,
4,2,2,28 MySQL Boulevard,QLD,347,RYAN,SALISBURY,RYAN.SALISBURY@sakilacustomer.org,1,9367,...,2006,1,7,0.99,99,29.99,NC-17,,,


### 3.0. Demonstrate that the New Data Warehouse Exists and Contains the Correct Data

In [35]:
# top 5 movies with the highest rental revenue 
sql_test = """
    SELECT films.`title` AS `film_title`,
        SUM(fact_rental.`amount`) AS `total_rental_price`
    FROM 
        `{0}`.`fact_rental` as fact_rental
    INNER JOIN 
        `{0}`.`dim_film` AS films
    ON 
       fact_rental.film_key = films.film_id
    GROUP BY 
        films.`title`
    ORDER BY 
        total_rental_price DESC;
""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head()

Unnamed: 0,film_title,total_rental_price
0,TELEGRAPH VOYAGE,231.73
1,WIFE TURN,223.69
2,ZORRO ARK,214.69
3,GOODFELLAS SALUTE,209.69
4,SATURDAY LAMBS,204.72


## 2. MongoDB section
### Run copy_dim_date.sql file before running this section!!!!!! 

#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [36]:
mysql_args = {
    "uid" : "root",
    "pwd" : "Passw0rd123",
    "hostname" : "localhost",
    "dbname" : "sakila_dw2"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "",
    "password" : "password",
    "cluster_name" : "cluster_name",
    "cluster_subnet" : "xxxxx",
    "cluster_location" : "local", # "local"
    "db_name" : "sakila_rentals"
}

#### Define Functions for Getting Data From and Setting Data Into Databases

In [37]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Populate MongoDB with Source Data
You only need to run this cell once; however, the operation is *idempotent*.  In other words, it can be run multiple times without changing the end result.

In [38]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"film" : 'sakila_film.json',
              "rental" : 'sakila_rental.json',
              "payment" : 'sakila_payment.json',
              "fact_rental" :'fact_rental.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)  

### 1.0. Create and Populate the New Dimension Tables
#### 1.1. Extract Data from the Source MongoDB Collections Into DataFrames

In [39]:
# film
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "film"

df_film = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42


In [40]:
# rental
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "rental"

df_rental = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 16:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 16:30:53


In [41]:
 # payment 
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "payment"

df_payment = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 17:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 17:12:30


#### 1.2. Lookup the Invoice Date Keys from the Date Dimension Table.
Here we see an example of where a dimension cross-references another dimension; the Date dimension.  The Date dimension is a classic example of a **Role-Playing dimension**. Dates in-and-of themselves are universal; however, when applied to specific events they take on the identity of those events. Here, the *dim_date* table takes on the identity of *dim_invoice_date* to supply invoice date keys to the *dim_invoices* table.


##### 1.2.1. Get the Data from the Date Dimension Table.
First, fetch the Surrogate Primary Key (date_key) and the Business Key (full_date) from the Date Dimension table using the **get_sql_dataframe()** function. Be certain to cast the **full_date** column to the **datetime64[ns]** data type using the **.astype()** function that is native to Pandas DataFrame columns. Also, extract the **date** portion using the **.dt.date** attribute of the **datetime64[ns]** datatype.

In [42]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw2.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


##### 1.2.2. Lookup the Surrogate Primary Key (date_key) that Corresponds to the payment_date Column

In [43]:
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_payment.payment_date = df_payment.payment_date.astype('datetime64[ns]').dt.date
df_payment = pd.merge(df_payment, df_dim_payment_date, on='payment_date', how='left')
df_payment.drop(['payment_date'], axis=1, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,last_update,payment_date_key
0,1,1,1,76,2.99,2006-02-15 17:12:30,20050525
1,2,1,1,573,0.99,2006-02-15 17:12:30,20050528


In [44]:
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_rental.return_date = df_rental.return_date.astype('datetime64[ns]').dt.date
df_rental = pd.merge(df_rental, df_dim_return_date, on='return_date', how='left')
df_rental.drop(['return_date'], axis=1, inplace=True)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,staff_id,last_update,return_date_key
0,1,2005-05-24 22:53:30,367,130,1,2006-02-15 16:30:53,20050526
1,2,2005-05-24 22:54:33,1525,459,1,2006-02-15 16:30:53,20050528


#### 1.3. Perform Any Necessary Transformations to the DataFrames

In [46]:
# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_film.insert(0, "film_key", range(1, df_film.shape[0]+1))
df_film.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42


In [47]:
df_rental.insert(0, "rental_key", range(1, df_rental.shape[0]+1))
df_rental.head(2)

Unnamed: 0,rental_key,rental_id,rental_date,inventory_id,customer_id,staff_id,last_update,return_date_key
0,1,1,2005-05-24 22:53:30,367,130,1,2006-02-15 16:30:53,20050526
1,2,2,2005-05-24 22:54:33,1525,459,1,2006-02-15 16:30:53,20050528


In [48]:
df_payment.insert(0, "payment_key", range(1, df_payment.shape[0]+1))
df_payment.head(2)

Unnamed: 0,payment_key,payment_id,customer_id,staff_id,rental_id,amount,last_update,payment_date_key
0,1,1,1,1,76,2.99,2006-02-15 17:12:30,20050525
1,2,2,1,1,573,0.99,2006-02-15 17:12:30,20050528


#### 1.3. Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables
Here we will call our **set_dataframe( )** function to create each dimension table. This function expects a number of parameters including the usual connection information (e.g., user_id, password, MySQL server name and database), the *table_name* we need to assign to the table, the *pandas DataFrame* we crafted to define & populate the table, the *name* of the column we wish to designate as the *primary_key* column, and finally, the database operation (insert or update). 

In [49]:
dataframe = df_rental
table_name = 'dim_rental'
primary_key = 'rental_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [50]:
dataframe = df_film
table_name = 'dim_film'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [51]:
dataframe = df_payment
table_name = 'dim_payment'
primary_key = 'payment_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### 1.4. Validate that the New Dimension Tables were Created.

In [52]:
sql_rental = "SELECT * FROM sakila_dw2.dim_rental;"
df_dim_rental = get_sql_dataframe(sql_rental, **mysql_args)
df_dim_rental.head(2)

Unnamed: 0,rental_key,rental_id,rental_date,inventory_id,customer_id,staff_id,last_update,return_date_key
0,1,1,2005-05-24 22:53:30,367,130,1,2006-02-15 16:30:53,20050526
1,2,2,2005-05-24 22:54:33,1525,459,1,2006-02-15 16:30:53,20050528


In [53]:
sql_film = "SELECT * FROM sakila_dw2.dim_film;"
df_dim_film = get_sql_dataframe(sql_film, **mysql_args)
df_dim_film.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42


In [54]:
sql_payment = "SELECT * FROM sakila_dw2.dim_payment;"
df_dim_payment = get_sql_dataframe(sql_payment, **mysql_args)
df_dim_payment.head(2)

Unnamed: 0,payment_key,payment_id,customer_id,staff_id,rental_id,amount,last_update,payment_date_key
0,1,1,1,1,76,2.99,2006-02-15 17:12:30,20050525
1,2,2,1,1,573,0.99,2006-02-15 17:12:30,20050528


### 2.0. Create and Populate the New Fact Tables
#### 2.1. Extract Data from the Source MongoDB Collections Into DataFrames

In [55]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "fact_rental"

df_fact_rental = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_fact_rental.head(2)

Unnamed: 0,store_key,address_key,address,district,customer_key,first_name,last_name,email,active,payment_key,...,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,rental_date_key,return_date_key,payment_date_key
0,1,1,47 MySakila Drive,Alberta,130,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,1,3503,...,2006,1,7,2.99,148,21.99,G,,,
1,1,1,47 MySakila Drive,Alberta,459,TOMMY,COLLAZO,TOMMY.COLLAZO@sakilacustomer.org,1,12374,...,2006,1,7,2.99,126,16.99,R,,,


#### 2.3. Lookup the Primary Keys from the Dimension Tables
**Foreign key relationships** must be established between each newly-crafted **Fact table** and each related **Dimension table**.

##### 2.3.1. First, fetch the Surrogate Primary Key and the Business Key from each Dimension table.

In [56]:
sql_dim_payment = "SELECT payment_key, payment_id FROM sakila_dw2.dim_payment;"
df_dim_payment = get_sql_dataframe(sql_dim_payment, **mysql_args)
df_dim_payment.head(2)

Unnamed: 0,payment_key,payment_id
0,1,1
1,2,2


In [57]:
# TODO: Extract the 'primary key' and the 'business key' from your new "dim_film" dimension table
sql_dim_film = "SELECT film_key, film_id FROM sakila_dw2.dim_film;"
df_dim_film = get_sql_dataframe(sql_dim_film, **mysql_args)
df_dim_film.head(2)

Unnamed: 0,film_key,film_id
0,1,1
1,2,2


In [58]:
# TODO: Extract the 'primary key' and the 'business key' from the "fact_rental" fact table
sql_dim_rental = "SELECT rental_key, rental_id FROM sakila_dw2.dim_rental;"
df_dim_rental = get_sql_dataframe(sql_dim_rental, **mysql_args)
df_dim_rental.head(2)

Unnamed: 0,rental_key,rental_id
0,1,1
1,2,2


In [59]:
df_fact_rental.insert(0, "fact_rental_key", range(1, len(df_fact_rental) + 1))
df_fact_rental.head(2)

Unnamed: 0,fact_rental_key,store_key,address_key,address,district,customer_key,first_name,last_name,email,active,...,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,rental_date_key,return_date_key,payment_date_key
0,1,1,1,47 MySakila Drive,Alberta,130,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,1,...,2006,1,7,2.99,148,21.99,G,,,
1,2,1,1,47 MySakila Drive,Alberta,459,TOMMY,COLLAZO,TOMMY.COLLAZO@sakilacustomer.org,1,...,2006,1,7,2.99,126,16.99,R,,,


#### 2.5. Load Newly Transformed MongoDB Data into the Sakila_DW2 Data Warehouse

In [60]:
dataframe = df_fact_rental
table_name = 'fact_rental'
primary_key = 'fact_rental_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### 2.6. Validate that the New Fact Tables were Created

In [62]:
# TODO: Validate the correctness of the new "rentals" fact table.
sql_fact_rental = "SELECT * FROM sakila_dw2.fact_rental;"
df_fact_rental = get_sql_dataframe(sql_fact_rental, **mysql_args)
df_fact_rental.head(2)

Unnamed: 0,fact_rental_key,store_key,address_key,address,district,customer_key,first_name,last_name,email,active,...,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,rental_date_key,return_date_key,payment_date_key
0,1,1,1,47 MySakila Drive,Alberta,130,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,1,...,2006,1,7,2.99,148,21.99,G,,,
1,2,1,1,47 MySakila Drive,Alberta,459,TOMMY,COLLAZO,TOMMY.COLLAZO@sakilacustomer.org,1,...,2006,1,7,2.99,126,16.99,R,,,


### 3.0. Demonstrate that the New Data Warehouse Exists and Contains the Correct Data

In [63]:
# movies ordered by total rental price 

sql_fact_rental = """
    SELECT dim_film.title,
        SUM(fact_rental.amount) AS total_rental_price
    FROM 
        fact_rental
    INNER JOIN 
        dim_film
    ON 
       fact_rental.film_key = dim_film.film_id
    GROUP BY 
        dim_film.title
    ORDER BY 
        total_rental_price DESC;
"""


In [64]:
df_fact_rental = get_sql_dataframe(sql_fact_rental, **mysql_args)
df_fact_rental

Unnamed: 0,title,total_rental_price
0,SEATTLE EXPECATIONS,31.96
1,MILLION ACE,26.97
2,ROSES TREASURE,24.96
3,VIDEOTAPE ARSENIC,24.96
4,BOOGIE AMELIE,22.96
...,...,...
625,TOWERS HURRICANE,0.99
626,VIETNAM SMOOCHY,0.99
627,VOICE PEACH,0.99
628,WATCH TRACY,0.99
