In [1]:
# Import the Necessary Libraries
import os
import numpy
import requests
import datetime
import json
import pandas as pd
import pymongo
from sqlalchemy import create_engine

In [2]:
# Declaring & Assigning Connection Variables for the MySQL Server & the sakila and sakila_2 Databases 
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_2"

In [3]:
# Define the get_dataframe and set_dataframe functions for Getting Data From sakila and Setting Data Into sakila_2
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

In [5]:
# Extracting Data from the Source sakila film table, and performing transformations such as
# dropping unnecessary columns ('original_language_id')
# renaming columns to conform with data warehouse design standards
sql_film = "SELECT * FROM sakila.film;"
df_film = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film)
drop_cols = ['original_language_id']
df_film.drop(drop_cols, axis=1, inplace=True)
df_film.rename(columns={"film_id":"film_key", "language_id":"language_key"}, inplace=True)
df_film.head(2)

Unnamed: 0,film_key,title,description,release_year,language_key,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42


In [6]:
# Loading the Transformed Film dataframe into the New sakila_2 Warehouse as a new dimension table (dim_film)
dataframe = df_film
table_name = 'dim_film'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [7]:
# Extracting Data from the Source sakila film_actor table, and performing transformations such as
# renaming columns to conform with data warehouse design standards
# adding a column (film_actor_key) to assign as the unique primary key column 
sql_film_actor = "SELECT * FROM sakila.film_actor;"
df_film_actor = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film_actor)
df_film_actor.rename(columns={"film_id":"film_key", "actor_id":"actor_key"}, inplace=True)
df_film_actor['film_actor_key'] = range(1, len(df_film_actor) + 1)
df_film_actor.head(2)

Unnamed: 0,actor_key,film_key,last_update,film_actor_key
0,1,1,2006-02-15 00:05:03,1
1,1,23,2006-02-15 00:05:03,2


In [8]:
# Loading the Transformed film_actor dataframe into the New sakila_2 Warehouse as a new dimension table (dim_film_actor)
dataframe = df_film_actor
table_name = 'dim_film_actor'
primary_key = 'film_actor_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [9]:
# Extract Data from the Source sakila rental table, and performing transformations such as
# dropping unnecessary columns ('last_update', 'staff_id', 'rental_id') 
# og rental_id column is being dropped due to auto increment issues 
# renaming columns to conform with data warehouse design standards
# adding a column ('rental_key') to assign as the unique primary key column in place of rental_id
# reordering the columns in the df_rental dataframe
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
drop_cols = ['last_update', 'staff_id', 'rental_id']
df_rental.drop(drop_cols, axis=1, inplace=True)
df_rental.rename(columns={"inventory_id":"inventory_key", "customer_id":"customer_key"}, inplace=True)
df_rental['rental_key'] = range(1, len(df_rental) + 1)
df_rental = df_rental[['rental_key', 'rental_date','customer_key', 'inventory_key', 'return_date']]
df_rental.head(2)

Unnamed: 0,rental_key,rental_date,customer_key,inventory_key,return_date
0,1,2005-05-24 22:53:30,130,367,2005-05-26 22:04:30
1,2,2005-05-24 22:54:33,459,1525,2005-05-28 19:40:33


In [10]:
# Loading the Transformed rental dataframe into the New sakila_2 Warehouse as a new dimension table (dim_rental)
dataframe = df_rental
table_name = 'dim_rental'
primary_key = 'rental_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [11]:
# Extracting/Retrieving Data from the local source file system sakila customer table, and performing transformations such as
# converting its original csv format into a SQL database table 
# dropping unnecessary columns ('last_update') 
# renaming columns to conform with data warehouse design standards
df_customer = pd.read_csv("sakila_customer.csv")
drop_cols = ['last_update']
df_customer.drop(drop_cols, axis=1, inplace=True)
df_customer.rename(columns={"customer_id":"customer_key", "store_id":"store_key", "address_id":"address_key"}, inplace=True)

df_customer.head(2)

Unnamed: 0,customer_key,store_key,first_name,last_name,email,address_key,active,create_date
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36


In [12]:
# Loading the Transformed customer dataframe into the New sakila_2 Warehouse as a new dimension table (dim_customer)
dataframe = df_customer
table_name = 'dim_customer'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [13]:
# demonstrating successful newly loaded customer dimension table
# fulfills requirement for data originating from a local file system 
sql_customer = "SELECT * FROM sakila_2.dim_customer;"
df_dim_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
df_dim_customer.head(2)

Unnamed: 0,customer_key,store_key,first_name,last_name,email,address_key,active,create_date
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36


In [14]:
# demonstrating successful newly loaded rental dimension table
# fulfills requirement for data originating from a relational database 
sql_rental = "SELECT * FROM sakila_2.dim_rental;"
df_dim_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
df_dim_rental.head(2)

Unnamed: 0,rental_key,rental_date,customer_key,inventory_key,return_date
0,1,2005-05-24 22:53:30,130,367,2005-05-26 22:04:30
1,2,2005-05-24 22:54:33,459,1525,2005-05-28 19:40:33


In [15]:
# demonstrating successful newly loaded film_actor dimension table
# fulfills requirement for 1/2 additional dimension table 
sql_film_actor = "SELECT * FROM sakila_2.dim_film_actor;"
df_dim_film_actor = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film_actor)
df_dim_film_actor.head(2)

Unnamed: 0,actor_key,film_key,last_update,film_actor_key
0,1,1,2006-02-15 00:05:03,1
1,1,23,2006-02-15 00:05:03,2


In [16]:
# demonstrating successful newly loaded film dimension table
# fulfills requirement for 2/2 additional dimension table 
sql_film = "SELECT * FROM sakila_2.dim_film;"
df_dim_film = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film)
df_dim_film.head(2)

Unnamed: 0,film_key,title,description,release_year,language_key,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42


In [17]:
# Declaring & Assigning Connection Variables for my MongoDB server/cluster, 
# the MySQL Server, sakila source database & sakila_2 destination database
mysql_uid = "root"
mysql_pwd = "Passw0rd123"
mysql_hostname = "localhost"

atlas_cluster_name = "clustermp.kjxd3wh"
atlas_user_name = "vup7bv"
atlas_password = "admin2131"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "sakila"
dst_dbname = "sakila_2"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://vup7bv:admin2131@clustermp.kjxd3wh.mongodb.net


In [18]:
# Defining specific functions for getting and setting data Into the sakila_2 dataframe
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [19]:
# Populating MongoDB with data sourced from local inventory json file sourced from the sakila database
# this will be used to create the dim_inventory table 
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"inventory" : 'sakila_inventory.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()           

In [20]:
# Extracting Data from the Source MongoDB inventory collection Into an inventory dataframe 
query = {} # Selecting all columns and rows
collection = "inventory"

df_inventory = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)  # Specify 'atlas', or 'local'
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 00:09:17
1,2,1,1,2006-02-15 00:09:17


In [21]:
# Performing necessary transformations to the inventory dataframe such as 
# dropping unnecessary columns ('store_id')
# renaming columns to conform with data warehouse design standards
drop_cols = ['store_id']
df_inventory.drop(drop_cols, axis=1, inplace=True)
df_inventory.rename(columns={"inventory_id":"inventory_key", "film_id":"film_key"}, inplace=True)


df_inventory.head(2)

Unnamed: 0,inventory_key,film_key,last_update
0,1,1,2006-02-15 00:09:17
1,2,1,2006-02-15 00:09:17


In [22]:
# Loading the Transformed inventory dataframe into the New sakila_2 Warehouse as a new dimension table (dim_inventory)
dataframe = df_inventory
table_name = 'dim_inventory'
primary_key = 'inventory_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [23]:
# demonstrating successful newly loaded inventory dimension table
# fulfills requirement for data originating from a NoSQL database 
sql_inventory = "SELECT * FROM sakila_2.dim_inventory;"
df_dim_inventory = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_inventory)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,film_key,last_update
0,1,1,2006-02-15 00:09:17
1,2,1,2006-02-15 00:09:17


In [24]:
# Declaring & Assigning Connection Variables for the MySQL Server & the sakila and sakila_2 Databases 
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_2"

In [25]:
# Define the get_dataframe and set_dataframe functions for Getting and Setting Data, 
# now working mostly with sakila_2 and the newly created dimension tables 

def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [26]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

In [27]:
# crafting the fact_rentals table using Pandas DataFrames by querying the sakila_2 database with the  
# newly created dimension tables (dim_film, dim_customer, dim_rental, dim_inventory)
# Get all the data from each of the four dim tables involved
sql_film = "SELECT * FROM sakila_2.dim_film;"
df_dim_film = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film)
df_dim_film.head(2)

Unnamed: 0,film_key,title,description,release_year,language_key,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42


In [28]:
# Get all the data from each of the four dim tables involved
sql_customer = "SELECT * FROM sakila_2.dim_customer;"
df_dim_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
df_dim_customer.head(2)

Unnamed: 0,customer_key,store_key,first_name,last_name,email,address_key,active,create_date
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36


In [29]:
# Get all the data from each of the four dim tables involved
sql_rental = "SELECT * FROM sakila_2.dim_rental;"
df_dim_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
df_dim_rental.head(2)

Unnamed: 0,rental_key,rental_date,customer_key,inventory_key,return_date
0,1,2005-05-24 22:53:30,130,367,2005-05-26 22:04:30
1,2,2005-05-24 22:54:33,459,1525,2005-05-28 19:40:33


In [30]:
# Get all the data from each of the four dim tables involved
sql_inventory = "SELECT * FROM sakila_2.dim_inventory;"
df_dim_inventory = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_inventory)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,film_key,last_update
0,1,1,2006-02-15 00:09:17
1,2,1,2006-02-15 00:09:17


In [31]:
# using the merge( ) method to inner join the rental and the inventory dataframes on the inventory_key column
df_dim_rental = pd.merge(df_dim_rental, df_dim_inventory, on='inventory_key', how='inner')
df_dim_rental.rename(columns={"film_key_x":"film_key"}, inplace=True)
df_dim_rental.head(2)

Unnamed: 0,rental_key,rental_date,customer_key,inventory_key,return_date,film_key,last_update
0,1,2005-05-24 22:53:30,130,367,2005-05-26 22:04:30,80,2006-02-15 00:09:17
1,1576,2005-06-16 04:03:28,327,367,2005-06-24 22:40:28,80,2006-02-15 00:09:17


In [32]:
# using the merge( ) method to inner join the resulting rental and the film dataframes on the film_key column
df_dim_rental = pd.merge(df_dim_rental, df_dim_film, on='film_key', how='inner')
df_dim_rental.rename(columns={"customer_key_x":"customer_key"}, inplace=True)
df_dim_rental.head(2)

Unnamed: 0,rental_key,rental_date,customer_key,inventory_key,return_date,film_key,last_update_x,title,description,release_year,language_key,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update_y
0,1,2005-05-24 22:53:30,130,367,2005-05-26 22:04:30,80,2006-02-15 00:09:17,BLANKET BEVERLY,A Emotional Documentary of a Student And a Gir...,2006,1,7,2.99,148,21.99,G,Trailers,2006-02-15 00:03:42
1,1576,2005-06-16 04:03:28,327,367,2005-06-24 22:40:28,80,2006-02-15 00:09:17,BLANKET BEVERLY,A Emotional Documentary of a Student And a Gir...,2006,1,7,2.99,148,21.99,G,Trailers,2006-02-15 00:03:42


In [33]:
# to create fact_rentals table, use merge( ) method to inner join the resulting rental and customer dataframe on customer_key
df_fact_rentals = pd.merge(df_dim_rental, df_dim_customer, on='customer_key', how='inner')
df_fact_rentals.head(5)

Unnamed: 0,rental_key,rental_date,customer_key,inventory_key,return_date,film_key,last_update_x,title,description,release_year,...,rating,special_features,last_update_y,store_key,first_name,last_name,email,address_key,active,create_date
0,1,2005-05-24 22:53:30,130,367,2005-05-26 22:04:30,80,2006-02-15 00:09:17,BLANKET BEVERLY,A Emotional Documentary of a Student And a Gir...,2006,...,G,Trailers,2006-02-15 00:03:42,1,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,134,1,2006-02-14 22:04:36
1,9720,2005-07-31 08:33:08,130,518,2005-08-08 04:50:08,114,2006-02-15 00:09:17,CAMELOT VACATION,A Touching Character Study of a Woman And a Wa...,2006,...,NC-17,"Trailers,Commentaries,Deleted Scenes,Behind th...",2006-02-15 00:03:42,1,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,134,1,2006-02-14 22:04:36
2,11807,2005-08-17 11:59:18,130,195,2005-08-18 09:13:18,43,2006-02-15 00:09:17,ATLANTIS CAUSE,A Thrilling Yarn of a Feminist And a Hunter wh...,2006,...,G,Behind the Scenes,2006-02-15 00:03:42,1,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,134,1,2006-02-14 22:04:36
3,7725,2005-07-28 04:56:33,130,492,2005-07-31 07:54:33,109,2006-02-15 00:09:17,BUTTERFLY CHOCOLAT,A Fateful Story of a Girl And a Composer who m...,2006,...,G,Behind the Scenes,2006-02-15 00:03:42,1,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,134,1,2006-02-14 22:04:36
4,2533,2005-06-19 01:39:04,130,901,2005-06-28 01:33:04,200,2006-02-15 00:09:17,CURTAIN VIDEOTAPE,A Boring Reflection of a Dentist And a Mad Cow...,2006,...,PG-13,"Trailers,Commentaries,Deleted Scenes,Behind th...",2006-02-15 00:03:42,1,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,134,1,2006-02-14 22:04:36


In [34]:
# Geting the Data from the sakila_2 Date Dimension Table, created using Lab_02c_Create_Populate_Dim_Date.sql
# using get_dataframe function to get the Surrogate Primary date_key and the Business Key (full_date) from date_dim
# casting the full_date column to the datetime64 data type
sql_dim_date = "select date_key, full_date from sakila_2.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64')
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [35]:
# Lookup the Surrogate Primary Key (rental_date_key) that Corresponds to the "rental_date" Column for fact_rentals
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rentals.rental_date = df_fact_rentals.rental_date.astype('datetime64').dt.date.astype('datetime64')
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_rental_date, on='rental_date', how='left')
df_fact_rentals.drop(['rental_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

  df_fact_rentals.rental_date = df_fact_rentals.rental_date.astype('datetime64').dt.date.astype('datetime64')


Unnamed: 0,rental_key,customer_key,inventory_key,return_date,film_key,last_update_x,title,description,release_year,language_key,...,special_features,last_update_y,store_key,first_name,last_name,email,address_key,active,create_date,rental_date_key
0,1,130,367,2005-05-26 22:04:30,80,2006-02-15 00:09:17,BLANKET BEVERLY,A Emotional Documentary of a Student And a Gir...,2006,1,...,Trailers,2006-02-15 00:03:42,1,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,134,1,2006-02-14 22:04:36,20050524
1,9720,130,518,2005-08-08 04:50:08,114,2006-02-15 00:09:17,CAMELOT VACATION,A Touching Character Study of a Woman And a Wa...,2006,1,...,"Trailers,Commentaries,Deleted Scenes,Behind th...",2006-02-15 00:03:42,1,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,134,1,2006-02-14 22:04:36,20050731


In [36]:
# Lookup the Surrogate Primary Key (return_date_key) that Corresponds to the "return_date" Column
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rentals.return_date = df_fact_rentals.return_date.astype('datetime64').dt.date.astype('datetime64')
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_return_date, on='return_date', how='left')
df_fact_rentals.drop(['return_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

  df_fact_rentals.return_date = df_fact_rentals.return_date.astype('datetime64').dt.date.astype('datetime64')


Unnamed: 0,rental_key,customer_key,inventory_key,film_key,last_update_x,title,description,release_year,language_key,rental_duration,...,last_update_y,store_key,first_name,last_name,email,address_key,active,create_date,rental_date_key,return_date_key
0,1,130,367,80,2006-02-15 00:09:17,BLANKET BEVERLY,A Emotional Documentary of a Student And a Gir...,2006,1,7,...,2006-02-15 00:03:42,1,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,134,1,2006-02-14 22:04:36,20050524,20050526.0
1,9720,130,518,114,2006-02-15 00:09:17,CAMELOT VACATION,A Touching Character Study of a Woman And a Wa...,2006,1,3,...,2006-02-15 00:03:42,1,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,134,1,2006-02-14 22:04:36,20050731,20050808.0


In [37]:
# updating the dim_rental table with the Surrogate Primary keys (rental_date_key, return_date_key) 
# that corresponds to the "rental_date" and "return_date" columns
# this is necessary for the final SQL solution 
sql_rental = "SELECT * FROM sakila_2.dim_rental;"
df_dim_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_dim_rental.rental_date = df_dim_rental.rental_date.astype('datetime64').dt.date.astype('datetime64')
df_dim_rental = pd.merge(df_dim_rental, df_dim_rental_date[['rental_date', 'rental_date_key']], on='rental_date', how='left')
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_dim_rental.return_date = df_dim_rental.return_date.astype('datetime64').dt.date.astype('datetime64')
df_dim_rental = pd.merge(df_dim_rental, df_dim_return_date[['return_date', 'return_date_key']], on='return_date', how='left')
df_dim_rental.head(5)

  df_dim_rental.rental_date = df_dim_rental.rental_date.astype('datetime64').dt.date.astype('datetime64')
  df_dim_rental.return_date = df_dim_rental.return_date.astype('datetime64').dt.date.astype('datetime64')


Unnamed: 0,rental_key,rental_date,customer_key,inventory_key,return_date,rental_date_key,return_date_key
0,1,2005-05-24,130,367,2005-05-26,20050524,20050526.0
1,2,2005-05-24,459,1525,2005-05-28,20050524,20050528.0
2,3,2005-05-24,408,1711,2005-06-01,20050524,20050601.0
3,4,2005-05-24,333,2452,2005-06-03,20050524,20050603.0
4,5,2005-05-24,222,2079,2005-06-02,20050524,20050602.0


In [38]:
# writing the newly updated dim_rentals table Back to the sakila_2 database 
table_name = "dim_rental"
primary_key = "rental_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_dim_rental, table_name, primary_key, db_operation)

sql_rental = "SELECT * FROM sakila_2.dim_rental;"
df_dim_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
df_dim_rental.head(5)

Unnamed: 0,rental_key,rental_date,customer_key,inventory_key,return_date,rental_date_key,return_date_key
0,1,2005-05-24,130,367,2005-05-26,20050524,20050526.0
1,2,2005-05-24,459,1525,2005-05-28,20050524,20050528.0
2,3,2005-05-24,408,1711,2005-06-01,20050524,20050601.0
3,4,2005-05-24,333,2452,2005-06-03,20050524,20050603.0
4,5,2005-05-24,222,2079,2005-06-02,20050524,20050602.0


In [39]:
# preparing the fact_rentals table so that it defines exactly what we want to see created, transformations include 
# dropping unwanted columns that are not needed in a fact table (eg. title, description)
# renaming certain ambiguous columns (customer_address_key) 
# reordering the columns to better allow for analysis of the rentals business process
# creating a new column (fact_rentals_key) to serve as the primary key for fact_rentals
drop_columns = [ 'first_name', 'last_name', 'email',
                 'active', 'last_update_x', 'last_update_y', 'title', 'description', 'special_features', 'rating'
               , 'create_date', 'release_year', 'length']
df_fact_rentals.drop(drop_columns, axis=1, inplace=True)

df_fact_rentals.rename(columns={"replacement_cost_x":"replacement_cost", "rental_rate_x":"rental_rate", 
                                "address_key":"customer_address_key"
                              }, inplace=True)

ordered_columns = ["rental_key", "rental_date_key", "return_date_key", "customer_key", "film_key", 
                  "inventory_key", "language_key", "rental_duration", "rental_rate", "replacement_cost", "store_key",
                  "customer_address_key"]
df_fact_rentals = df_fact_rentals[ordered_columns]

df_fact_rentals.insert(0, "fact_rentals_key", range(1, df_fact_rentals.shape[0]+1))
df_fact_rentals.head(5)

Unnamed: 0,fact_rentals_key,rental_key,rental_date_key,return_date_key,customer_key,film_key,inventory_key,language_key,rental_duration,rental_rate,replacement_cost,store_key,customer_address_key
0,1,1,20050524,20050526.0,130,80,367,1,7,2.99,21.99,1,134
1,2,9720,20050731,20050808.0,130,114,518,1,3,0.99,26.99,1,134
2,3,11807,20050817,20050818.0,130,43,195,1,6,2.99,15.99,1,134
3,4,7725,20050728,20050731.0,130,109,492,1,3,0.99,17.99,1,134
4,5,2533,20050619,20050628.0,130,200,901,1,7,0.99,27.99,1,134


In [40]:
# writing the new fact_rentals dataframe Back to the sakila_2 database 
table_name = "fact_rentals"
primary_key = "fact_rentals_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_rentals, table_name, primary_key, db_operation)

In [41]:
#SQL QUERY 1 to demonstrate proper functionality of the fact_rentals, dim_rental, and dim_customer tables
# IDENTIFY OUR 10 MOST FREQUENT CUSTOMERS, WHEN THEY LAST RENTED SOMETHING, AND HOW MUCH THEY HAVE SPENT TOTAL
# These SQL select statements return a solution that identifies customers by their customer_key & last name
# and shows the date of their most recent rental, their total number of rentals, and the total amount 
# they have spent on rentals. This is ordered by total number of rentals in descending order to show the 
# most loyal (and target) customers first on the list 
sql_test = """
 select
    customer.customer_key,
    customer.last_name AS "Customer Last Name",
    (
        select dim_rental.rental_date
        from `{0}`.dim_rental AS dim_rental
        where dim_rental.customer_key = customer.customer_key
        order by dim_rental.rental_date_key DESC
        limit 1
    ) AS "Most Recent Rental Date",
    COUNT(fr.customer_key) AS "Total Number of Rentals",
    SUM(fr.rental_rate) AS "Total Amount Spent"
from `{0}`.fact_rentals AS fr
inner join `{0}`.dim_customer AS customer
ON fr.customer_key = customer.customer_key
group by customer.customer_key, customer.last_name
order by 4 desc;""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head(10)

Unnamed: 0,customer_key,Customer Last Name,Most Recent Rental Date,Total Number of Rentals,Total Amount Spent
0,259,JENSEN,2005-08-23,14,47.86
1,197,PETERS,2005-08-23,12,37.88
2,204,SCHMIDT,2005-08-22,12,39.88
3,141,REYES,2005-08-23,12,43.88
4,25,WALKER,2005-08-23,12,33.88
5,114,ELLIS,2006-02-14,12,41.88
6,220,ALVAREZ,2005-08-23,11,36.89
7,302,SILVERMAN,2005-08-23,11,26.89
8,279,SHELTON,2006-02-14,11,36.89
9,447,BOWENS,2005-08-23,11,24.89
