## DS 2002 Midterm Project
The project focuses on creating a data warehouse for a sample SQL dataset, sakila. It follows the ETL process, extracting data from MySQL, locally, and MongoDB, transforming the data, and loading it into dimension tables and a fact table.

#### 1.0 Importing Necessary Libraries

In [1]:
# importing libraries
import os
import json
import numpy
import datetime
import pandas as pd
import certifi

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

#### 1.1 Defining parameters for connecting to SQL/MongoDB

In [2]:
mysql_args = {
    "uid" : "root",
    #"pwd" : "", # removed my password for final submission
    "hostname" : "localhost",
    "dbname" : "sakila"
}

src_dbname = "sakila"
dst_dbname = "sakila_dw"

mongodb_args = {
    "user_name" : "kylesethtran",
    "password" : "Password",
    "cluster_name" : "mongodbintro",
    "cluster_subnet" : "gltgp",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila"
}

#### 1.2 Defining Methods for Getting Data from SQL/MongoDB (Sourced from Lab 4)

In [3]:
# Here, I am referencing the methods provided in Lab 4 to allow us to get/set SQL tables easily, as well as connect to MongoDB easily
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    
def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client

def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### 1.3 Creating Empty Data Warehouse

In [4]:
# Creating Empty Data Warehouse for the sakila database
conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;"))
connection.execute(text(f"USE {dst_dbname};"))

connection.close()

#### 1.4 Creating Date Dimension

In [5]:
# After creating the empty data warehouse, I ran the script from Lab 2C to create the date dimensions table for the fact table later
# Here, we are verifying the dim_date table has been created
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


### 2.0 Extracting Data into DataFrames

#### 2.1 Extract Customer and Rental Dimensions from MySQL Database

In [6]:
# Here, I am extracting data from the customer table from SQL into a datafeame
sql_customers = "SELECT * FROM sakila.customer"
df_customers = get_sql_dataframe(sql_customers, **mysql_args)
df_customers.head()

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [7]:
# Here, I am extracting data from the rental table from SQL into a dataframe
sql_rentals = "SELECT * FROM sakila.rental"
df_rentals = get_sql_dataframe(sql_rentals, **mysql_args)
df_rentals.head()

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53
2,3,2005-05-24 23:03:39,1711,408,2005-06-01 22:12:39,1,2006-02-15 21:30:53
3,4,2005-05-24 23:04:41,2452,333,2005-06-03 01:43:41,2,2006-02-15 21:30:53
4,5,2005-05-24 23:05:21,2079,222,2005-06-02 04:33:21,1,2006-02-15 21:30:53


#### 2.2 Populating Film Data and Inventory Data into MongoDB and Extracting Film Data from MongoDB

In [8]:
## Populating Film and Inventory Dimension
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

# Specifying the json files that will be loaded into the MongoDb collections
# The json files were created from the MySQL database
json_files = {"film" : 'sakila-film.json',
              "inventory": 'sakila-inventory.json'}

# Here, we are loading in the json files to the MongoDB cluster database
set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)  

In [9]:
# Extracting Film Data from MongoDb collection
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "film"

# Here, we are extracting the film collection from the MongoDB database
df_films = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_films.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [10]:
# Extracting Inventory Data from MongoDB collection
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "inventory"

# Here, we are extracting the inventory data from the MongoDB collection
df_inventory = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


#### 2.3 Extracting Staff Data Locally (CSV File)

In [11]:
# Extracting data from staff csv file
staff_dir = os.path.join(os.getcwd(), 'data/sakila_staff.csv')

# Here, we are extracting data from a csv file containing data on the 'staff' entity
# The csv was generated the same way the json files were, using the original MySQL database
df_staff = pd.read_csv(staff_dir)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


### 3.0 Transforming the DataFrames to create Dimension Tables

#### 3.1.1 Getting Surrogate Keys from Date Dimension Table

In [12]:
# Here, we are getting the surrogate keys for the dim_date table
sql_dim_date = "SELECT date_key, full_date FROM northwind_dw2.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


#### 3.1.2 Lookup the Surrogate Primary Key (date_key) that corresponds to the create_date, last_date columns in Customers DataFrame

In [13]:
# Here, we are merging the create_day_key into the customers dataframe
df_dim_create_date = df_dim_date.rename(columns={"date_key" : "create_date_key", "full_date" : "create_date"})
df_customers.create_date = df_customers.create_date.astype('datetime64[ns]').dt.date
df_customers = pd.merge(df_customers, df_dim_create_date, on='create_date', how='left')
df_customers.drop(['create_date'], axis=1, inplace=True)
df_customers.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,last_update,create_date_key
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-15 04:57:20,20060214
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-15 04:57:20,20060214


In [14]:
# Here, we are merging the last_update_key into the customers dataframe
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_customers.last_update = df_customers.last_update.astype('datetime64[ns]').dt.date
df_customers = pd.merge(df_customers, df_dim_last_update, on='last_update', how='left')
df_customers.drop(['last_update'], axis=1, inplace=True)
df_customers.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date_key,last_update_key
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,20060214,20060215
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,20060214,20060215


In [15]:
# Here, we are renaming some columns and adding a key for the customers dataframe
df_customers.rename(columns={"create_date_key":"customer_create_date_key", "last_update_key":"customer_last_update_key"}, inplace=True)

df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))
df_customers.head(2)

Unnamed: 0,customer_key,customer_id,store_id,first_name,last_name,email,address_id,active,customer_create_date_key,customer_last_update_key
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,20060214,20060215
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,20060214,20060215


#### 3.1.3 Lookup the Surrogate Primary Key (date_key) that corresponds to the rental_date, return_date, last_update columns in Rental DataFrame

In [16]:
# Updating rental_date to use date_key format
# Here, we are updating the rental_date column
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_rentals.rental_date = df_rentals.rental_date.astype('datetime64[ns]').dt.date
df_rentals = pd.merge(df_rentals, df_dim_rental_date, on='rental_date', how='left')
df_rentals.drop(['rental_date'], axis=1, inplace=True)
df_rentals.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,return_date,staff_id,last_update,rental_date_key
0,1,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,20050524
1,2,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53,20050524


In [20]:
# Updating return_date to use date_key format
# Here, we are updating the return day column
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_rentals.return_date = df_rentals.return_date.astype('datetime64[ns]').dt.date
df_rentals = pd.merge(df_rentals, df_dim_return_date, on='return_date', how='left')
df_rentals.drop(['return_date'], axis=1, inplace=True)
df_rentals.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,staff_id,last_update,rental_date_key,return_date_key
0,1,367,130,1,2006-02-15 21:30:53,20050524,20050526.0
1,2,1525,459,1,2006-02-15 21:30:53,20050524,20050528.0


In [17]:
# Updating last_update to use date_key format
# Here, we are updating the last update column
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_rentals.last_update = df_rentals.last_update.astype('datetime64[ns]').dt.date
df_rentals = pd.merge(df_rentals, df_dim_last_update, on='last_update', how='left')
df_rentals.drop(['last_update'], axis=1, inplace=True)
df_rentals.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,return_date,staff_id,rental_date_key,last_update_key
0,1,367,130,2005-05-26 22:04:30,1,20050524,20060215
1,2,1525,459,2005-05-28 19:40:33,1,20050524,20060215


In [18]:
# Making final transformations for rental dataframe
# Here, we are renaming columns and inserting a new primary key into the rentals dataframe
df_rentals.rename(columns={"return_date_key":"rental_return_date_key", "last_update_key":"rental_last_update_key"}, inplace=True)

df_rentals.insert(0, "rental_key", range(1, df_rentals.shape[0]+1))
df_rentals.head(2)

Unnamed: 0,rental_key,rental_id,inventory_id,customer_id,return_date,staff_id,rental_date_key,rental_last_update_key
0,1,1,367,130,2005-05-26 22:04:30,1,20050524,20060215
1,2,2,1525,459,2005-05-28 19:40:33,1,20050524,20060215


#### 3.1.4 Lookup the Surrogate Primary Key (date_key) that corresponds to the last_update column in Film DataFrame

In [19]:
# Updating last_update to use date_key format
# Here, we are updating the last update column
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_films.last_update = df_films.last_update.astype('datetime64[ns]').dt.date
df_films = pd.merge(df_films, df_dim_last_update, on='last_update', how='left')
df_films.drop(['last_update'], axis=1, inplace=True)
df_films.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update_key
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",20060215
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",20060215


In [20]:
# Making final transformations for film dataframe
# Here, we are renaming some column(s) and inserting a new primary key
df_films.rename(columns={"last_update_key":"film_last_update_key"}, inplace=True)

df_films.insert(0, "film_key", range(1, df_films.shape[0]+1))
df_films.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,film_last_update_key
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",20060215
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",20060215


#### 3.1.5 Lookup the Surrogate Primary Key (date_key) that corresponds to the last_update column in Staff DataFrame

In [21]:
# Updating last_update to use date_key format
# Here, we are updating the last_date column
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_staff.last_update = df_staff.last_update.astype('datetime64[ns]').dt.date
df_staff = pd.merge(df_staff, df_dim_last_update, on='last_update', how='left')
df_staff.drop(['last_update'], axis=1, inplace=True)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update_key
0,1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,20060215
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,20060215


In [22]:
# Making final transformations for staff dataframe
# Here, we are renaming column(s), dropping column(s), and inserting a new primary key
df_staff.rename(columns={"last_update_key":"staff_last_update_key"}, inplace=True)

drop_columns = ["picture", "password"]
df_staff.drop(drop_columns, axis=1, inplace = True)

df_staff.insert(0, "staff_key", range(1, df_staff.shape[0]+1))
df_staff.head(2)

Unnamed: 0,staff_key,staff_id,first_name,last_name,address_id,email,store_id,active,username,staff_last_update_key
0,1,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,20060215
1,2,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,20060215


#### 3.1.5 Lookup the Surrogate Primary Key (date_key) that corresponds to the last_update column in Inventory DataFrame

In [23]:
# Updating last_update to use date_key format
# Here, we are updating the last_update column
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_inventory.last_update = df_inventory.last_update.astype('datetime64[ns]').dt.date
df_inventory = pd.merge(df_inventory, df_dim_last_update, on='last_update', how='left')
df_inventory.drop(['last_update'], axis=1, inplace=True)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update_key
0,1,1,1,20060215
1,2,1,1,20060215


In [24]:
# Making final transformations for inventory dataframe
# We are renaming column(s) and inserting a new primary key
df_inventory.rename(columns={"last_update_key":"inventory_last_update_key"}, inplace=True)

df_inventory.insert(0, "inventory_key", range(1, df_inventory.shape[0]+1))
df_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id,inventory_last_update_key
0,1,1,1,1,20060215
1,2,2,1,1,20060215


#### 3.2.1 Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [27]:
# Here, we are defining which dimensions are going to be loaded into the new data warehouse
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_rentals', df_rentals, 'rental_key'),
          ('dim_films', df_films, 'film_key'),
          ('dim_staff', df_staff, 'staff_key'),
          ('dim_inventory', df_inventory, 'inventory_key')]

In [28]:
# 1. updating dbname parameter to use location of new data warehouse
mysql_args["dbname"] = "sakila_dw"
# 2. insert each dimension table into salika_dw
for table_name, dataframe, primary_key in tables:
    set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### 3.2.2 Validate the New Dimension Tables were Created

In [29]:
# here, we are checking that the customer dimensions table was created by querying from it
sql_customers = "SELECT * FROM sakila_dw.dim_customers;"
df_dim_customers = get_sql_dataframe(sql_customers, **mysql_args)
df_dim_customers.head(2)

Unnamed: 0,customer_key,customer_id,store_id,first_name,last_name,email,address_id,active,customer_create_date_key,customer_last_update_key
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,20060214,20060215
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,20060214,20060215


In [30]:
# here, we are checking that the rental dimensions table was created by querying from it
sql_rentals = "SELECT * FROM sakila_dw.dim_rentals;"
df_dim_rentals = get_sql_dataframe(sql_rentals, **mysql_args)
df_dim_rentals.head(2)

Unnamed: 0,rental_key,rental_id,inventory_id,customer_id,return_date,staff_id,rental_date_key,rental_last_update_key
0,1,1,367,130,2005-05-26 22:04:30,1,20050524,20060215
1,2,2,1525,459,2005-05-28 19:40:33,1,20050524,20060215


In [31]:
# here, we are checking that the films dimensions table was created by querying from it 
sql_films = "SELECT * FROM sakila_dw.dim_films;"
df_dim_films = get_sql_dataframe(sql_films, **mysql_args)
df_dim_films.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,film_last_update_key
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",20060215
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",20060215


In [32]:
# here, we are checking that the staff dimensions table was created by querying from it
sql_staff = "SELECT * FROM sakila_dw.dim_staff;"
df_dim_staff = get_sql_dataframe(sql_staff, **mysql_args)
df_dim_staff.head(2)

Unnamed: 0,staff_key,staff_id,first_name,last_name,address_id,email,store_id,active,username,staff_last_update_key
0,1,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,20060215
1,2,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,20060215


In [33]:
# here, we are checking that the inventory dimensions table was created by querying from it
sql_inventory = "SELECT * FROM sakila_dw.dim_inventory;"
df_dim_inventory = get_sql_dataframe(sql_inventory, **mysql_args)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id,inventory_last_update_key
0,1,1,1,1,20060215
1,2,2,1,1,20060215


### 4.0 Create New Fact Table

#### 4.1 Extract Data from Source (Local CSV)

In [34]:
## Using the payment table as a foundation for fact table
# Extracting data from CSV (3rd Data Source)
payment_dir = os.path.join(os.getcwd(), 'data/sakila_payment.csv')

df_fact_payments = pd.read_csv(payment_dir)
df_fact_payments.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


#### 4.1 Look Up DateKeys from Date Dimension Table

In [35]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "payment_date" Column.
# Updating the format of date columns in payment df to use dim_date keys
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_payments.payment_date = df_fact_payments.payment_date.astype('datetime64[ns]').dt.date
df_fact_payments = pd.merge(df_fact_payments, df_dim_payment_date, on='payment_date', how='left')
df_fact_payments.drop(['payment_date'], axis=1, inplace=True)
df_fact_payments.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,last_update,payment_date_key
0,1,1,1,76,2.99,2006-02-15 22:12:30,20050525
1,2,1,1,573,0.99,2006-02-15 22:12:30,20050528


In [36]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "last_update" Column.
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_fact_payments.last_update = df_fact_payments.last_update.astype('datetime64[ns]').dt.date
df_fact_payments = pd.merge(df_fact_payments, df_dim_last_update, on='last_update', how='left')
df_fact_payments.drop(['last_update'], axis=1, inplace=True)
df_fact_payments.rename(columns={"last_update_key":"payment_last_update_key"}, inplace=True)
df_fact_payments.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date_key,payment_last_update_key
0,1,1,1,76,2.99,20050525,20060215
1,2,1,1,573,0.99,20050528,20060215


#### 4.2 Look Up Primary Keys from Dimensions Table

##### First, fetch the Surrogate Primary Key and the Business Key from each Dimension table.

In [37]:
# Extract the 'primary key' and the 'business key' from new "dim_customers" dimension table
sql_dim_customers = "SELECT customer_key, customer_id FROM sakila_dw.dim_customers;"
df_dim_customers = get_sql_dataframe(sql_dim_customers, **mysql_args)
df_dim_customers.head(2)

Unnamed: 0,customer_key,customer_id
0,1,1
1,2,2


In [40]:
# Extract the 'primary key' and the 'business key' from new "dim_rentals" dimension table
sql_dim_rentals = "SELECT rental_key, rental_id FROM sakila_dw.dim_rentals;"
df_dim_rentals = get_sql_dataframe(sql_dim_rentals, **mysql_args)
df_dim_rentals.head(2)

Unnamed: 0,rental_key,rental_id
0,1,1
1,2,2


In [41]:
# Extract the 'primary key' and the 'business key' from new "dim_staff" dimension table
sql_dim_staff = "SELECT staff_key, staff_id FROM sakila_dw.dim_staff;"
df_dim_staff = get_sql_dataframe(sql_dim_staff, **mysql_args)
df_dim_staff.head(2)

Unnamed: 0,staff_key,staff_id
0,1,1
1,2,2


##### Next, use Business Keys to lookup the corresponding Surrogate Primary Keys in the Dimension tables

In [42]:
# Merge the "fact_payments" and "dim_customers" dataframes on the 'customer_id' to
# get the 'customer_key'. Then drop the 'customer_id' column and display the results.
df_fact_payments = pd.merge(df_fact_payments, df_dim_customers, on='customer_id', how='inner')
df_fact_payments.drop(['customer_id'], axis=1, inplace=True)
df_fact_payments.head(2)

Unnamed: 0,payment_id,staff_id,rental_id,amount,payment_date_key,payment_last_update_key,customer_key
0,1,1,76,2.99,20050525,20060215,1
1,2,1,573,0.99,20050528,20060215,1


In [43]:
# Merge the "fact_payments" and "dim_staff" dataframes on the 'staff_id' to
# get the 'staff_key'. Then drop the 'staff_id' column and display the results.
df_fact_payments = pd.merge(df_fact_payments, df_dim_staff, on='staff_id', how='inner')
df_fact_payments.drop(['staff_id'], axis=1, inplace=True)
df_fact_payments.head(2)

Unnamed: 0,payment_id,rental_id,amount,payment_date_key,payment_last_update_key,customer_key,staff_key
0,1,76,2.99,20050525,20060215,1,1
1,2,573,0.99,20050528,20060215,1,1


In [44]:
# Merge the "fact_payments" and "dim_rentals" dataframes on the 'rental_id' to
# get the 'rental_key'. Then drop the 'rental_id' column and display the results.
df_fact_payments = pd.merge(df_fact_payments, df_dim_rentals, on='rental_id', how='inner')
df_fact_payments.drop(['rental_id'], axis=1, inplace=True)
df_fact_payments.head(2)

Unnamed: 0,payment_id,amount,payment_date_key,payment_last_update_key,customer_key,staff_key,rental_key
0,1,2.99,20050525,20060215,1,1,76
1,2,0.99,20050528,20060215,1,1,572


#### 4.1 Perform any Additional Transformations

In [45]:
# Inserting a new "fact_payment_key" column, with an ever-incrementing 
# numeric value, to serve as the surrogate primary key. Then display the results.
df_fact_payments.insert(0, "fact_payment_key", range(1, df_fact_payments.shape[0]+1))
df_fact_payments.head(2)

Unnamed: 0,fact_payment_key,payment_id,amount,payment_date_key,payment_last_update_key,customer_key,staff_key,rental_key
0,1,1,2.99,20050525,20060215,1,1,76
1,2,2,0.99,20050528,20060215,1,1,572


#### 4.1 Load Newly Transformed Fact Table into the Sakila_DW Data Warehouse

In [46]:
dataframe = df_fact_payments
table_name = 'fact_payments'
primary_key = 'fact_payment_key'
db_operation = "insert"

# loading the new fact table into the sakila_dw 
set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### 4.2 Validate New Fact Table was Created

In [47]:
# validating the fact table was created by querying from it
sql_payments = "SELECT * FROM sakila_dw.fact_payments;"
df_fact_payments = get_sql_dataframe(sql_payments, **mysql_args)
df_fact_payments.head(2)

Unnamed: 0,fact_payment_key,payment_id,amount,payment_date_key,payment_last_update_key,customer_key,staff_key,rental_key
0,1,1,2.99,20050525,20060215,1,1,76
1,2,2,0.99,20050528,20060215,1,1,572


### 5.0 Demonstrate that the New Data Warehouse Exists and Contains the Correct Data

The first query I am going to make is counting the number of payments each customer makes and the total amount paid across all their purchases. In other words, this query returns the number of payments per customer and the total amount spent

In [48]:
# Querying the number of payments per customer and the total amount spent
sql_payments = """
    SELECT customers.`first_name` AS First_Name, 
        customers.`last_name` AS Last_Name,
        customers.`email` As Email,
        SUM(payments.`amount`) As Total_Amount,
        COUNT(*) AS Number_of_Payments
    FROM `{0}`.`fact_payments` AS payments
    INNER JOIN `{0}`.`dim_customers` AS customers
    ON payments.customer_key = customers.customer_key
    GROUP BY customers.`first_name`, customers.`last_name`, customers.`email`
    ORDER BY customers.`first_name`, customers.`last_name`
""".format("sakila_dw")

In [84]:
df_fact_payments = get_sql_dataframe(sql_payments, **mysql_args)
df_fact_payments

Unnamed: 0,First_Name,Last_Name,Email,Amount,Number_of_Payments
0,AMY,LOPEZ,AMY.LOPEZ@sakilacustomer.org,127.71,29
1,ANGELA,HERNANDEZ,ANGELA.HERNANDEZ@sakilacustomer.org,140.64,36
2,ANNA,HILL,ANNA.HILL@sakilacustomer.org,91.79,21
3,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,81.78,22
4,BETTY,WHITE,BETTY.WHITE@sakilacustomer.org,117.72,28
5,BRENDA,WRIGHT,BRENDA.WRIGHT@sakilacustomer.org,104.74,26
6,CAROL,GARCIA,CAROL.GARCIA@sakilacustomer.org,91.78,22
7,CYNTHIA,YOUNG,CYNTHIA.YOUNG@sakilacustomer.org,111.68,32
8,DEBORAH,WALKER,DEBORAH.WALKER@sakilacustomer.org,115.71,29
9,DONNA,THOMPSON,DONNA.THOMPSON@sakilacustomer.org,98.79,21


The second query I am going to make is to get the number of times a specific movie has been rented

In [60]:
# Querying the number of times a movie has been rented, joining fact_payments, dim_rentals, dim_films, dim_inventory
sql_rentals = """
    SELECT films.`title` AS Title,
        COUNT(*) as Number_Times_Rented
    FROM `{0}`.`fact_payments` AS payments
    INNER JOIN `{0}`.`dim_rentals` AS rentals
    ON payments.rental_key = rentals.rental_key
    INNER JOIN `{0}`.`dim_inventory` AS inventory
    ON inventory.`inventory_id` = rentals.`inventory_id`
    INNER JOIN `{0}`.`dim_films` AS films
    ON films.`film_id` = inventory.`film_id`
    GROUP BY films.`title`
    ORDER BY Number_Times_Rented DESC
""".format("sakila_dw")

In [61]:
rentals = get_sql_dataframe(sql_rentals, **mysql_args)
rentals

Unnamed: 0,Title,Number_Times_Rented
0,ARIZONA BANG,6
1,EMPIRE MALKOVICH,6
2,LOSE INCH,6
3,AMADEUS HOLY,5
4,CONFIDENTIAL INTERVIEW,5
...,...,...
589,WONDERLAND CHRISTMAS,1
590,WONKA SEA,1
591,WYOMING STORM,1
592,YOUNG LANGUAGE,1


The last query I am going to make is the number of payments a staff member has processed

In [64]:
# Querying the number of payments each staff member has processed
sql_staff = """
    SELECT staff.`first_name` AS First_Name, 
        staff.`last_name` AS Last_Name,
        COUNT(*) As Total_Number_Of_Payments_Processed
    FROM `{0}`.`fact_payments` AS payments
    INNER JOIN `{0}`.`dim_staff` AS staff
    ON payments.staff_key = staff.staff_key
    GROUP BY staff.`first_name`, staff.`last_name`
    ORDER BY staff.`first_name`, staff.`last_name`
""".format("sakila_dw")

In [65]:
staff = get_sql_dataframe(sql_staff, **mysql_args)
staff

Unnamed: 0,First_Name,Last_Name,Total_Number_Of_Payments_Processed
0,Jon,Stephens,473
1,Mike,Hillyer,527
