In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 1.4.39
Running PyMongo Version: 4.10.1


### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [3]:
mysql_args = {
    "uid" : "root",
    "pwd" : "password1!",
    "hostname" : "localhost",
    "dbname" : "sakila2"
}

dst_dbname = "sakila2"

## x5fzr8obH9U8oD5O
#hfx6mh
#R1UC1iTyq66qPXbn
# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "kimmy",
    "password" : "pwd1",
    "cluster_name" : "midtermporject",
    "cluster_subnet" : "jmyqn",
    "cluster_location" : "local", # "local"
    "db_name" : "sakila2"
}

### Create a new Data Warehouse Database

In [4]:
conn_str = f"mysql+pymysql://{'root'}:{'password1!'}@{'localhost'}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;"))
connection.execute(text(f"USE {dst_dbname};"))

connection.close()

### Define Functions for Getting Data From and Setting Data Into Databases

In [5]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

In [6]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'ds2002_data')
json_files = {
                "film" : 'film.json',
                "inventory" : 'inventory.json',
                "store" : 'store.json', 
                "rental" : 'rental.json', 
                "customer" : 'customer.json',
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files) 

## Extract

In [7]:
client = get_mongo_client(**mongodb_args)
query = {} # Select all elements (columns), and all documents (rows).
collection = "film"

df_film = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [8]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "inventory"

df_inventory = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


In [9]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "store"

df_store = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_store.head(2)

Unnamed: 0,store_id,manager_staff_id,address_id,last_update
0,1,1,1,2006-02-15 04:57:12
1,2,2,2,2006-02-15 04:57:12


In [10]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "rental"

df_rental = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [11]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "customer"

df_customer = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


## Get Data from Date Dimension Table

In [13]:
sql_dim_date = "SELECT date_key, full_date FROM sakila2.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


##### Lookup the Surrogate Primary Key (date_key) that Corresponds to the invoice_date Column

In [14]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_return_date_key", "full_date" : "return_date"})
df_rental.return_date = df_rental.return_date.astype('datetime64[ns]').dt.date
df_rental = pd.merge(df_rental, df_dim_rental_date, on = 'return_date', how='left')
df_rental.drop(['return_date'], axis=1, inplace=True)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,staff_id,last_update,rental_return_date_key
0,1,2005-05-24 22:53:30,367,130,1,2006-02-15 21:30:53,20050526
1,2,2005-05-24 22:54:33,1525,459,1,2006-02-15 21:30:53,20050528


### Perform necessary transactions to the DataFrame

In [15]:
# 1. Rename the "id" column to reflect the entity as it will serve as the business key for lookup operations
df_rental.rename(columns={"id":"rental_id"}, inplace=True)

# 2. Since there are no values in the 'due_date' column go ahead and drop it.
#df_rental.drop(['due_date'], axis=1, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_rental.insert(0, "rental_key", range(1, df_rental.shape[0]+1))
df_rental.head(2)

Unnamed: 0,rental_key,rental_id,rental_date,inventory_id,customer_id,staff_id,last_update,rental_return_date_key
0,1,1,2005-05-24 22:53:30,367,130,1,2006-02-15 21:30:53,20050526
1,2,2,2005-05-24 22:54:33,1525,459,1,2006-02-15 21:30:53,20050528


In [16]:
df_film.rename(columns={"id":"film_id"}, inplace=True)
df_film.drop(['original_language_id'], axis=1, inplace=True)
df_film.insert(0, "film_key", range(1, df_film.shape[0]+1))
df_film.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [17]:
df_inventory.rename(columns={"id":"inventory_id"}, inplace=True)
df_inventory.insert(0, "inventory_key", range(1, df_inventory.shape[0]+1))
df_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id,last_update
0,1,1,1,1,2006-02-15 05:09:17
1,2,2,1,1,2006-02-15 05:09:17


In [18]:
df_store.insert(0, "store_key", range(1, df_store.shape[0]+1))
df_store.head(5)

Unnamed: 0,store_key,store_id,manager_staff_id,address_id,last_update
0,1,1,1,1,2006-02-15 04:57:12
1,2,2,2,2,2006-02-15 04:57:12


In [19]:
df_customer.insert(0, "customer_key", range(1, df_customer.shape[0]+1))
df_customer.head(2)

Unnamed: 0,customer_key,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


### Load the Transformed JSON DataFrames into the New Data Warehouse by Creating New Tables


In [20]:
dataframe = df_film
table_name = 'dim_film'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [21]:
dataframe = df_inventory
table_name = 'dim_inventory'
primary_key = 'inventory_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [22]:
dataframe = df_store
table_name = 'dim_store'
primary_key = 'store_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [23]:
dataframe = df_rental
table_name = 'dim_rental'
primary_key = 'rental_key'
db_operation = "insert"
df_rental.head(2)

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [24]:
df_rental.head(2)

Unnamed: 0,rental_key,rental_id,rental_date,inventory_id,customer_id,staff_id,last_update,rental_return_date_key
0,1,1,2005-05-24 22:53:30,367,130,1,2006-02-15 21:30:53,20050526
1,2,2,2005-05-24 22:54:33,1525,459,1,2006-02-15 21:30:53,20050528


In [25]:
dataframe = df_customer
table_name = 'dim_customer'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

### Validate Tables Were Created 

In [26]:
sql_invoices = "SELECT * FROM sakila2.dim_film;"
df_dim_film = get_sql_dataframe(sql_invoices, **mysql_args)
df_dim_film.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [27]:
sql_invoices = "SELECT * FROM sakila2.dim_inventory;"
df_dim_inventory = get_sql_dataframe(sql_invoices, **mysql_args)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id,last_update
0,1,1,1,1,2006-02-15 05:09:17
1,2,2,1,1,2006-02-15 05:09:17


In [28]:
sql_invoices = "SELECT * FROM sakila2.dim_store;"
df_dim_store = get_sql_dataframe(sql_invoices, **mysql_args)
df_dim_store.head(2)

Unnamed: 0,store_key,store_id,manager_staff_id,address_id,last_update
0,1,1,1,1,2006-02-15 04:57:12
1,2,2,2,2,2006-02-15 04:57:12


In [29]:
sql_invoices = "SELECT * FROM sakila2.dim_rental;"
df_dim_rental = get_sql_dataframe(sql_invoices, **mysql_args)
df_dim_rental.head(2)

Unnamed: 0,rental_key,rental_id,rental_date,inventory_id,customer_id,staff_id,last_update,rental_return_date_key
0,1,1,2005-05-24 22:53:30,367,130,1,2006-02-15 21:30:53,20050526
1,2,2,2005-05-24 22:54:33,1525,459,1,2006-02-15 21:30:53,20050528


In [30]:
sql_invoices = "SELECT * FROM sakila2.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_invoices, **mysql_args)
df_dim_customer.head(2)

Unnamed: 0,customer_key,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


### Reading CSV File 

In [31]:
data_dir = os.path.join(os.getcwd(), 'ds2002_data')
data_file = os.path.join(data_dir, 'film_category.csv')

df_film_category = pd.read_csv(data_file, header=0, index_col=0)
df_film_category.head(2)

Unnamed: 0_level_0,category_id,last_update
film_id,Unnamed: 1_level_1,Unnamed: 2_level_1
1,6,2006-02-15 05:07:09
2,11,2006-02-15 05:07:09


In [32]:
data_dir = os.path.join(os.getcwd(), 'ds2002_data')
data_file = os.path.join(data_dir, 'category.csv')

df_category = pd.read_csv(data_file, header=0, index_col=0)
df_category.head(2)

Unnamed: 0_level_0,name,last_update
category_id,Unnamed: 1_level_1,Unnamed: 2_level_1
1,Action,2006-02-15 04:46:27
2,Animation,2006-02-15 04:46:27


### Transformations

In [33]:
df_film_category = df_film_category.reset_index()
df_film_and_category = pd.merge(df_film_category, df_category, on='category_id', how='inner')
df_film_and_category = df_film_and_category.drop('last_update_x', axis=1)
df_film_and_category = df_film_and_category.drop('last_update_y', axis=1)
df_film_and_category['film_key'] = df_film_and_category['film_id']
df_film_and_category.head()

Unnamed: 0,film_id,category_id,name,film_key
0,1,6,Documentary,1
1,3,6,Documentary,3
2,40,6,Documentary,40
3,58,6,Documentary,58
4,62,6,Documentary,62


In [34]:
df_film = pd.merge(df_film, df_film_and_category, on='film_key', how='left')
columns_to_drop = [col for col in df_film.columns if col.endswith('_x') or col.endswith('_y')]
df_film.drop(columns=columns_to_drop, inplace=True)
df_film = df_film.rename(columns={"name":"category_name"})
df_film['film_id'] = df_film['film_key']
df_film.head(2)

Unnamed: 0,film_key,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update,category_id,category_name,film_id
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary,1
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42,11,Horror,2


In [35]:
df_inventory = pd.merge(df_inventory, df_film, on ='film_id', how='left')
columns_to_drop = [col for col in df_film.columns if col.endswith('_x') or col.endswith('_y')]
df_inventory.drop(columns=columns_to_drop, inplace=True)

df_inventory.head()

Unnamed: 0,inventory_key,inventory_id,film_id,store_id,last_update_x,film_key,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update_y,category_id,category_name
0,1,1,1,1,2006-02-15 05:09:17,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
1,2,2,1,1,2006-02-15 05:09:17,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
2,3,3,1,1,2006-02-15 05:09:17,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
3,4,4,1,1,2006-02-15 05:09:17,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
4,5,5,1,2,2006-02-15 05:09:17,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary


### Write back to SQL 

In [36]:
dataframe = df_inventory
table_name = 'dim_inventory'
primary_key = 'inventory_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [37]:
dataframe = df_film
table_name = 'dim_film'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

### Validate 

In [38]:
sql_invoices = "SELECT * FROM sakila2.dim_inventory;"
df_dim_inventory = get_sql_dataframe(sql_invoices, **mysql_args)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id,last_update_x,film_key,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update_y,category_id,category_name
0,1,1,1,1,2006-02-15 05:09:17,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
1,2,2,1,1,2006-02-15 05:09:17,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary


In [39]:
sql_invoices = "SELECT * FROM sakila2.dim_film;"
df_dim_film = get_sql_dataframe(sql_invoices, **mysql_args)
df_dim_film.head(2)

Unnamed: 0,film_key,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update,category_id,category_name,film_id
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary,1
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42,11,Horror,2


## Create and Populate the New Fact Tables

#### Extract Data from the Source MongoDB Collections Into DataFrames

In [40]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "rental"

df_fact_rental = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_fact_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [41]:
## Lookup the Surrogate Primary Key (date_key) that Corresponds to the "rental_date" Column.
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rental.rental_date = df_fact_rental.rental_date.astype('datetime64[ns]').dt.date
df_fact_rental = pd.merge(df_fact_rental, df_dim_rental_date, on='rental_date', how='left')
df_fact_rental.drop(['rental_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,return_date,staff_id,last_update,rental_date_key
0,1,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,20050524
1,2,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53,20050524


In [42]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "return_date" Column.
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rental.return_date = df_fact_rental.return_date.astype('datetime64[ns]').dt.date
df_fact_rental = pd.merge(df_fact_rental, df_dim_rental_date, on='return_date', how='left')
df_fact_rental.drop(['return_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,staff_id,last_update,rental_date_key,return_date_key
0,1,367,130,1,2006-02-15 21:30:53,20050524,20050526
1,2,1525,459,1,2006-02-15 21:30:53,20050524,20050528


In [43]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "last_update" Column.
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_fact_rental.last_update = df_fact_rental.last_update.astype('datetime64[ns]').dt.date
df_fact_rental = pd.merge(df_fact_rental, df_dim_rental_date, on='last_update', how='left')
df_fact_rental.drop(['last_update'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,staff_id,rental_date_key,return_date_key,last_update_key
0,1,367,130,1,20050524,20050526,20060215
1,2,1525,459,1,20050524,20050528,20060215


### Business Keys to lookup corresponding Surrogate Primary Keys

In [44]:
sql_dim_film = "SELECT film_key, film_id FROM sakila2.dim_film;"
df_dim_film = get_sql_dataframe(sql_dim_film, **mysql_args)
df_dim_film.head(2)

Unnamed: 0,film_key,film_id
0,1,1
1,2,2


In [45]:
sql_dim_inventory = "SELECT inventory_key, inventory_id, film_id FROM sakila2.dim_inventory;"
df_dim_inventory = get_sql_dataframe(sql_dim_inventory, **mysql_args)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id
0,1,1,1
1,2,2,1


In [46]:
sql_dim_store = "SELECT store_key, store_id  FROM sakila2.dim_store;"
df_dim_store = get_sql_dataframe(sql_dim_store, **mysql_args)
df_dim_store.head(2)

Unnamed: 0,store_key,store_id
0,1,1
1,2,2


In [47]:
sql_dim_customer = "SELECT customer_key, customer_id FROM sakila2.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_dim_customer, **mysql_args)
df_dim_customer.head(2)

Unnamed: 0,customer_key,customer_id
0,1,1
1,2,2


### Merging

In [48]:
df_fact_rental = pd.merge(df_fact_rental, df_dim_inventory, on='inventory_id', how='inner')
df_fact_rental.drop(['inventory_id'], axis=1, inplace=True)
df_fact_rental.head()

Unnamed: 0,rental_id,customer_id,staff_id,rental_date_key,return_date_key,last_update_key,inventory_key,film_id
0,1,130,1,20050524,20050526,20060215,367,80
1,16,316,2,20050525,20050526,20060215,389,86
2,17,575,1,20050525,20050527,20060215,830,181
3,21,388,2,20050525,20050526,20060215,146,31
4,22,509,2,20050525,20050526,20060215,727,159


In [49]:
# Insert a new 'fact_inventory_transaction_key' column, with an ever-incrementing
# numeric value, to serve as the surrogate primary key. Then display the results.
df_fact_rental.insert(0, "fact_rental_key", range(1, df_fact_rental.shape[0]+1))
df_fact_rental.head()

Unnamed: 0,fact_rental_key,rental_id,customer_id,staff_id,rental_date_key,return_date_key,last_update_key,inventory_key,film_id
0,1,1,130,1,20050524,20050526,20060215,367,80
1,2,16,316,2,20050525,20050526,20060215,389,86
2,3,17,575,1,20050525,20050527,20060215,830,181
3,4,21,388,2,20050525,20050526,20060215,146,31
4,5,22,509,2,20050525,20050526,20060215,727,159


### Load DB back to Warehouse 

In [50]:
dataframe = df_fact_rental
table_name = 'fact_rental'
primary_key = 'fact_rental_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

### Validate Fact Table Created

In [53]:
sql_fact_rental = "SELECT * FROM sakila2.fact_rental;"
df_dim_rental = get_sql_dataframe(sql_fact_rental, **mysql_args)
df_dim_rental.head()

Unnamed: 0,fact_rental_key,rental_id,customer_id,staff_id,rental_date_key,return_date_key,last_update_key,inventory_key,film_id
0,1,1,130,1,20050524,20050526,20060215,367,80
1,2,16,316,2,20050525,20050526,20060215,389,86
2,3,17,575,1,20050525,20050527,20060215,830,181
3,4,21,388,2,20050525,20050526,20060215,146,31
4,5,22,509,2,20050525,20050526,20060215,727,159


### Demonstrate New Data Warehouse Exists

In [82]:
sql_purchase_orders = """ 
    SELECT 
        fr.film_id,
        fi.title AS film_title,
        c.customer_id,
        CONCAT(c.first_name, ' ', c.last_name) AS customer_name,
        COUNT(fr.rental_id) AS total_rentals_for_film,
        SUM(COUNT(fr.rental_id)) OVER (PARTITION BY c.customer_id) AS total_rentals_for_customer

    FROM 
        fact_rental fr
    JOIN 
        dim_inventory i ON fr.inventory_key = i.inventory_key
    JOIN 
        dim_film fi ON i.film_id = fi.film_key
    JOIN 
        dim_customer c ON fr.customer_id = c.customer_id
    GROUP BY 
        fr.film_id, fi.title, c.customer_id, customer_name
    ORDER BY 
        total_rentals_for_customer DESC;
"""

df_fact_purchase_orders = get_sql_dataframe(sql_purchase_orders, **mysql_args)
df_fact_purchase_orders

Unnamed: 0,film_id,film_title,customer_id,customer_name,total_rentals_for_film,total_rentals_for_customer
0,98,BRIGHT ENCOUNTERS,551,CLAYTON BARBEE,1,4.0
1,132,CHAINSAW UPTOWN,551,CLAYTON BARBEE,1,4.0
2,55,BARBARELLA STREETCAR,551,CLAYTON BARBEE,1,4.0
3,65,BEHAVIOR RUNAWAY,551,CLAYTON BARBEE,1,4.0
4,131,CENTER DINOSAUR,89,JULIA FLORES,1,3.0
...,...,...,...,...,...,...
207,109,BUTTERFLY CHOCOLAT,577,CLIFTON MALCOLM,1,1.0
208,100,BROOKLYN DESERT,584,SALVADOR TEEL,1,1.0
209,1,ACADEMY DINOSAUR,587,SERGIO STANFIELD,1,1.0
210,60,BEAST HUNCHBACK,595,TERRENCE GUNDERSON,1,1.0
