## Midterm Project - hpb2gv
This is my submission for the ETL data processor for DS 2002. My name is Kevin Cha (hpb2gv), and I began work on the "sakila" database - a sample database that outlines a movie rental system, and actually comes with MySQL (it's also available on the linked website from the assignment writeup). A quick look at the schema shows that there's plenty of dependencies between tables, and even a couple of many-to-many relations (like the film_category table).

## Step 1: Initializing everything and getting the sources setup
This is some really basic code I ripped from the other labs to setup my connections + data sources.

In [48]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [49]:
#MySQL setup args
mysql_args = {
    "uid" : "root",
    "pwd" : "Password1234!",
    "hostname" : "localhost",
    "src_db" : "sakila",
    "dst_db" : "sakila2"
}

# MongoDB setup args
mongodb_args = {
    "user_name" : "hpb2gv",
    "password" : "iF8gBlzWg1h0jPkj",
    "cluster_name" : "testcluster",
    "cluster_subnet" : "x9rml",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila"
}

In [50]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['src_db']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection)
    connection.close()
    
    return dframe
    

def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

In [51]:
conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{mysql_args['dst_db']}`;"))
connection.execute(text(f"CREATE DATABASE `{mysql_args['dst_db']}`;"))
connection.execute(text(f"USE {mysql_args['dst_db']};"))

connection.close()

Note that there MUST be a data folder in the same directory as this notebook containing the json files for this code to work. This code is setting up the collection in the MongoDB cluster.

In [52]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_file = {"films" : 'sakila-film.json',
             "languages" : 'sakila-language.json'}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_file)

## Step 2: Creating Dimension Tables
The base OLTP database didn't actually have any dimension tables, so it's time to do that! I'll be doing that by:
1. Grabbing data from a CSV and constructing a dimension table based off that data.
2. Grabbing data from a MongoDB collection (the same data that I initialized above) and constructing a dimension table based off that data.
3. Grabbing data from the remaining items from the initial OLTP database and constructing a dimension table based off that data.

## Step 2.0.5: Using Lab 2c to create the dim_date table!
Yeah so at this point the Lab 2c code MUST be run and pointing at sakila2. Otherwise, I won't be able to include a datekey for my customer data in step 2.1!

## Step 2.1: CSV data -  Customer data
Customers are pretty important data for the purposes of this transactional process, so I'm going to be grabbing this data from a CSV. Note that the CSVs MUST be located in a folder in the same directory as this notebook for this to work (the data folder from before). I'll also be dealing with address + it's dependencies (city, country) to make a big dim_customers.

In [53]:
# Grabbing the customer data
data_dir = os.path.join(os.getcwd(), 'data')
data_file_customer = os.path.join(data_dir, 'sakila-customer.csv')

df_customer = pd.read_csv(data_file_customer, header=0)
# Also I'm dropping the last_update columns for all of these because I don't find it relevant
df_customer.drop(['last_update', 'store_id'], axis=1, inplace=True)
df_customer.insert(0, "customer_key", range(1, df_customer.shape[0]+1))
df_customer.head()

Unnamed: 0,customer_key,customer_id,first_name,last_name,email,address_id,active,create_date
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36
1,2,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36
2,3,3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36
3,4,4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36
4,5,5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36


And for the record, I'm just going to be using a direct SQL query grabbing the dim_date from the sakila2 table for the purpose of the datekey.

In [54]:
sql_dim_date = "SELECT date_key, full_date FROM sakila2.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head()

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02
2,20000103,2000-01-03
3,20000104,2000-01-04
4,20000105,2000-01-05


And thanks to the code that was provided from the previous lab, we can just do:

In [55]:
df_dim_account_creation_date = df_dim_date.rename(columns={"date_key" : "account_create_date_key", "full_date" : "create_date"})
df_customer.create_date = df_customer.create_date.astype('datetime64[ns]').dt.date
df_customer = pd.merge(df_customer, df_dim_account_creation_date, on='create_date', how='left')
df_customer.drop(['create_date'], axis=1, inplace=True)
df_customer.head()

Unnamed: 0,customer_key,customer_id,first_name,last_name,email,address_id,active,account_create_date_key
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,20060214
1,2,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,20060214
2,3,3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,20060214
3,4,4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,20060214
4,5,5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,20060214


And that looks pretty good for the date key!

In [56]:
# Grabbing the address data
data_file_address = os.path.join(data_dir, 'sakila-address.csv')

df_address = pd.read_csv(data_file_address, header=0)
# Also I'm dropping the last_update columns for all of these because I don't find it relevant
# A couple of the other columns are also either NaNs or not very relevant (the phone column seems like random values)
df_address.drop(['address2', 'last_update', 'phone', 'location'], axis=1, inplace=True)
df_address.head()

Unnamed: 0,address_id,address,district,city_id,postal_code
0,1,47 MySakila Drive,Alberta,300,
1,2,28 MySQL Boulevard,QLD,576,
2,3,23 Workhaven Lane,Alberta,300,
3,4,1411 Lillydale Drive,QLD,576,
4,5,1913 Hanoi Way,Nagasaki,463,35200.0


In [57]:
# Grabbing the city data
data_file_city = os.path.join(data_dir, 'sakila-city.csv')

df_city = pd.read_csv(data_file_city, header=0)
# Also I'm dropping the last_update columns for all of these because I don't find it relevant
df_city.drop(['last_update'], axis=1, inplace=True)
df_city.head()

Unnamed: 0,city_id,city,country_id
0,1,A Coruña (La Coruña),87
1,2,Abha,82
2,3,Abu Dhabi,101
3,4,Acuña,60
4,5,Adana,97


In [58]:
# Grabbing the country data
data_file_country = os.path.join(data_dir, 'sakila-country.csv')

df_country = pd.read_csv(data_file_country, header=0)
# Also I'm dropping the last_update columns for all of these because I don't find it relevant
df_country.drop(['last_update'], axis=1, inplace=True)
df_country.head()

Unnamed: 0,country_id,country
0,1,Afghanistan
1,2,Algeria
2,3,American Samoa
3,4,Angola
4,5,Anguilla


Some small merging to make the addresses nicer:

In [59]:
df_city_and_country = pd.merge(df_city, df_country, on='country_id', how='inner')
df_address_with_info = pd.merge(df_address, df_city_and_country, on='city_id', how='inner')
df_address_with_info.drop(['city_id', 'country_id'], axis=1, inplace=True) # Removing the ids

df_address_with_info.head()

Unnamed: 0,address_id,address,district,postal_code,city,country
0,1,47 MySakila Drive,Alberta,,Lethbridge,Canada
1,2,28 MySQL Boulevard,QLD,,Woodridge,Australia
2,3,23 Workhaven Lane,Alberta,,Lethbridge,Canada
3,4,1411 Lillydale Drive,QLD,,Woodridge,Australia
4,5,1913 Hanoi Way,Nagasaki,35200.0,Sasebo,Japan


And now finalizing dim_customers:

In [60]:
df_customer_with_address = pd.merge(df_customer, df_address_with_info, on='address_id', how='inner')
df_customer_with_address.drop(['address_id'], axis=1, inplace=True)

df_customer_with_address.head()

Unnamed: 0,customer_key,customer_id,first_name,last_name,email,active,account_create_date_key,address,district,postal_code,city,country
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,20060214,1913 Hanoi Way,Nagasaki,35200.0,Sasebo,Japan
1,2,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,20060214,1121 Loja Avenue,California,17886.0,San Bernardino,United States
2,3,3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,1,20060214,692 Joliet Street,Attika,83579.0,Athenai,Greece
3,4,4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,1,20060214,1566 Inegl Manor,Mandalay,53561.0,Myingyan,Myanmar
4,5,5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,1,20060214,53 Idfu Parkway,Nantou,42399.0,Nantou,Taiwan


And now, pushing everything to the OLAP database:

In [61]:
db_operation = "insert"

tables = [('dim_customers', df_customer_with_address, 'customer_key')]

In [62]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(mysql_args['uid'], mysql_args['pwd'], mysql_args['hostname'], mysql_args['dst_db'], dataframe, table_name, primary_key, db_operation)

## Step 2.2: MongoDB data - Film Data
On top of just the Film entity I'll also be grabbing a separate "languages" table and JOINing the two so that the Film entity is a little bit more complete before throwing it into the OLAP database.

In [63]:
# Getting films collection
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "films"

df_films = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_films.drop(['last_update', 'original_language_id'], axis=1, inplace=True)
df_films.head()

Unnamed: 0,film_id,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
2,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a ...,2006,1,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
3,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumb...,2006,1,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
4,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And ...,2006,1,6,2.99,130,22.99,G,Deleted Scenes


In [64]:
# Getting languages collection
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "languages"

df_languages = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_languages.drop(['last_update'], axis=1, inplace=True)
df_languages.head()

Unnamed: 0,language_id,name
0,1,English
1,2,Italian
2,3,Japanese
3,4,Mandarin
4,5,French


In [65]:
df_films_with_languages = pd.merge(df_films, df_languages, on='language_id', how='inner')
df_films_with_languages.drop(['language_id'], axis=1, inplace=True)
# Renaming the language column to language instead of name to be more descriptive
df_films_with_languages.rename(columns={"name":"language"}, inplace=True)
df_films_with_languages.insert(0, "film_key", range(1, df_films_with_languages.shape[0]+1))
df_films_with_languages.head()

Unnamed: 0,film_key,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,language
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",English
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",English
2,3,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a ...,2006,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",English
3,4,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumb...,2006,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",English
4,5,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And ...,2006,6,2.99,130,22.99,G,Deleted Scenes,English


And that looks pretty good for our dim_films table! Time to push it to the OLAP database.

In [66]:
db_operation = "insert"

tables = [('dim_films', df_films_with_languages, 'film_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(mysql_args['uid'], mysql_args['pwd'], mysql_args['hostname'], mysql_args['dst_db'], dataframe, table_name, primary_key, db_operation)

## Step 2.3: Remaining OLTP dim data that we need directly from MySQL
We'll be grabbing the last bit of data we'll need to create all of our dim tables. The last thing to grab here is the staff and store information.

In [67]:
sql_staff = "SELECT * FROM sakila.staff;"
df_staff = get_sql_dataframe(sql_staff, **mysql_args)
df_staff.head()

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


In [68]:
sql_store = "SELECT * FROM sakila.store;"
df_store = get_sql_dataframe(sql_store, **mysql_args)
df_store.head()

Unnamed: 0,store_id,manager_staff_id,address_id,last_update
0,1,1,1,2006-02-15 04:57:12
1,2,2,2,2006-02-15 04:57:12


Now, some small transformations to both of these (namely dropping irrelevant columns)

In [69]:
# Using the dataframe with the address info from section 2.1
df_staff_with_address = pd.merge(df_staff, df_address_with_info, on='address_id', how='inner')
df_staff_with_address.drop(['last_update', 'picture', 'address_id', 'store_id'], axis=1, inplace=True)
df_staff_with_address.insert(0, "staff_key", range(1, df_staff_with_address.shape[0]+1))
df_staff_with_address.head()

Unnamed: 0,staff_key,staff_id,first_name,last_name,email,active,username,password,address,district,postal_code,city,country
0,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,23 Workhaven Lane,Alberta,,Lethbridge,Canada
1,2,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,Jon,,1411 Lillydale Drive,QLD,,Woodridge,Australia


In [70]:
# Grabbing the store data
# Also I'm dropping the last_update columns for all of these because I don't find it relevant
df_store.drop(['last_update', 'manager_staff_id'], axis=1, inplace=True)
df_store.insert(0, "store_key", range(1, df_store.shape[0]+1))
df_store_with_address = pd.merge(df_store, df_address_with_info, on='address_id', how='inner')
df_store_with_address.drop(['address_id'], axis=1, inplace=True)
df_store_with_address.head()

Unnamed: 0,store_key,store_id,address,district,postal_code,city,country
0,1,1,47 MySakila Drive,Alberta,,Lethbridge,Canada
1,2,2,28 MySQL Boulevard,QLD,,Woodridge,Australia


And those look pretty solid, so I'm going to throw them to our database now:

In [71]:
db_operation = "insert"

tables = [('dim_staff', df_staff_with_address, 'staff_key'),
          ('dim_store', df_store_with_address, 'store_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(mysql_args['uid'], mysql_args['pwd'], mysql_args['hostname'], mysql_args['dst_db'], dataframe, table_name, primary_key, db_operation)

## Step 3: Creating the dim_date table for use in creating the Fact table
Just run the code from 2c but pointed at saklia2 here.

## Step 4: Creating the Fact Table
Gonna be doing this by grabbing payment + rental information here, then doing all my necessary key replacements from my dimensional dfs from before and whatnot.

In [72]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_sql_dataframe(sql_payment, **mysql_args)
df_payment.head()

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30
2,3,1,1,1185,5.99,2005-06-15 00:54:12,2006-02-15 22:12:30
3,4,1,2,1422,0.99,2005-06-15 18:02:53,2006-02-15 22:12:30
4,5,1,2,1476,9.99,2005-06-15 21:08:46,2006-02-15 22:12:30


In [73]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_sql_dataframe(sql_rental, **mysql_args)
df_rental.head()

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53
2,3,2005-05-24 23:03:39,1711,408,2005-06-01 22:12:39,1,2006-02-15 21:30:53
3,4,2005-05-24 23:04:41,2452,333,2005-06-03 01:43:41,2,2006-02-15 21:30:53
4,5,2005-05-24 23:05:21,2079,222,2005-06-02 04:33:21,1,2006-02-15 21:30:53


Now, just looking at this, a few things definitely have to be changed before we can make our overall fact_rentals table. We'll probably want some date keys with the dates we have here, and again the last_update column doesn't matter. I'm also going to be injecting the film_id and store_id from the inventory table with the inventory_id relation in the rentals table because it just makes so much more sense to me here for an OLAP setup!

In [74]:
df_payment.drop(['last_update'], axis=1, inplace=True)
df_payment.head()

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date
0,1,1,1,76,2.99,2005-05-25 11:30:37
1,2,1,1,573,0.99,2005-05-28 10:35:23
2,3,1,1,1185,5.99,2005-06-15 00:54:12
3,4,1,2,1422,0.99,2005-06-15 18:02:53
4,5,1,2,1476,9.99,2005-06-15 21:08:46


In [75]:
df_rental.drop(['last_update'], axis=1, inplace=True)
df_rental.head()

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1
2,3,2005-05-24 23:03:39,1711,408,2005-06-01 22:12:39,1
3,4,2005-05-24 23:04:41,2452,333,2005-06-03 01:43:41,2
4,5,2005-05-24 23:05:21,2079,222,2005-06-02 04:33:21,1


And now I'm just gonna go ahead and merge the two into one nice big fact table. Note that there will be some NaNs and NaTs in the table, but that's because the customer has not made a payment yet for a rental. And that's fine in my mind!

In [76]:
df_fact_rentals = pd.merge(df_rental, df_payment, on=['rental_id', 'customer_id', 'staff_id'], how='outer')
df_fact_rentals.insert(0, "fact_rentals_key", range(1, df_fact_rentals.shape[0]+1))
df_fact_rentals.drop(['payment_id'], axis=1, inplace=True)
df_fact_rentals.head()

Unnamed: 0,fact_rentals_key,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,amount,payment_date
0,1,1,2005-05-24 22:53:30,367.0,130,2005-05-26 22:04:30,1,2.99,2005-05-24 22:53:30
1,2,2,2005-05-24 22:54:33,1525.0,459,2005-05-28 19:40:33,1,,NaT
2,3,2,NaT,,459,NaT,2,2.99,2005-05-24 22:54:33
3,4,3,2005-05-24 23:03:39,1711.0,408,2005-06-01 22:12:39,1,,NaT
4,5,3,NaT,,408,NaT,2,3.99,2005-05-24 23:03:39


And like I said earlier, I'm going to grab the inventory table and merge that into this fact table so that we have some nice dependencies going on with our other dimensional tables:

In [77]:
sql_inventory = "SELECT * FROM sakila.inventory;"
df_inventory = get_sql_dataframe(sql_inventory, **mysql_args)
df_inventory.head()

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17
2,3,1,1,2006-02-15 05:09:17
3,4,1,1,2006-02-15 05:09:17
4,5,1,2,2006-02-15 05:09:17


In [78]:
df_fact_rentals = pd.merge(df_fact_rentals, df_inventory, on='inventory_id', how='left')
df_fact_rentals.drop(['last_update'], axis=1, inplace=True)
df_fact_rentals.head()

Unnamed: 0,fact_rentals_key,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,amount,payment_date,film_id,store_id
0,1,1,2005-05-24 22:53:30,367.0,130,2005-05-26 22:04:30,1,2.99,2005-05-24 22:53:30,80.0,1.0
1,2,2,2005-05-24 22:54:33,1525.0,459,2005-05-28 19:40:33,1,,NaT,333.0,2.0
2,3,2,NaT,,459,NaT,2,2.99,2005-05-24 22:54:33,,
3,4,3,2005-05-24 23:03:39,1711.0,408,2005-06-01 22:12:39,1,,NaT,373.0,2.0
4,5,3,NaT,,408,NaT,2,3.99,2005-05-24 23:03:39,,


## Step 4.1: Surrogate Primary Keys
And now I need to grab the surrogate keys from the dimensional tables I made earlier, so I'll go ahead and just query them directly using MySQL statements:

In [79]:
sql_dim_customers = "SELECT customer_key, customer_id FROM sakila2.dim_customers;"
df_dim_customers = get_sql_dataframe(sql_dim_customers, **mysql_args)
df_dim_customers.head()

Unnamed: 0,customer_key,customer_id
0,1,1
1,2,2
2,3,3
3,4,4
4,5,5


In [80]:
sql_dim_staff = "SELECT staff_key, staff_id FROM sakila2.dim_staff;"
df_dim_staff = get_sql_dataframe(sql_dim_staff, **mysql_args)
df_dim_staff.head()

Unnamed: 0,staff_key,staff_id
0,1,1
1,2,2


In [81]:
sql_dim_films = "SELECT film_key, film_id FROM sakila2.dim_films;"
df_dim_films = get_sql_dataframe(sql_dim_films, **mysql_args)
df_dim_films.head()

Unnamed: 0,film_key,film_id
0,1,1
1,2,2
2,3,3
3,4,4
4,5,5


In [82]:
sql_dim_store = "SELECT store_key, store_id FROM sakila2.dim_store;"
df_dim_store = get_sql_dataframe(sql_dim_store, **mysql_args)
df_dim_store.head()

Unnamed: 0,store_key,store_id
0,1,1
1,2,2


Note that I did NOT get the df_dim_date again because I already did it above in section 2.1 when making a date key for customers. I will still be using it here again though.

And now, for the best and final part - replacing all the ids with the surrogate keys in the fact table.

In [None]:
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_customers, on='customer_id', how='left')
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_staff, on='staff_id', how='left')
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_films, on='film_id', how='left')
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_store, on='store_id', how='left')

# Dropping the old IDs that aren't relevant anymore
df_fact_rentals.drop(['customer_id', 'staff_id', 'film_id', 'store_id', 'inventory_id'], axis=1, inplace=True)

df_fact_rentals.head()

Unnamed: 0,fact_rentals_key,rental_id,rental_date,return_date,amount,payment_date,customer_key,staff_key,film_key,store_key
0,1,1,2005-05-24 22:53:30,2005-05-26 22:04:30,2.99,2005-05-24 22:53:30,130,1,80,1
1,2,2,2005-05-24 22:54:33,2005-05-28 19:40:33,,NaT,459,1,333,2
2,4,3,2005-05-24 23:03:39,2005-06-01 22:12:39,,NaT,408,1,373,2
3,7,4,2005-05-24 23:04:41,2005-06-03 01:43:41,,NaT,333,2,535,1
4,8,5,2005-05-24 23:05:21,2005-06-02 04:33:21,6.99,2005-05-24 23:05:21,222,1,450,2


Now, notice I didn't deal with the dim_date yet; it's time to deal with the rental_date, return_date, and payment_date all separately here:

In [38]:
# Rental_date
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rentals.rental_date = df_fact_rentals.rental_date.astype('datetime64[ns]').dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_rental_date, on='rental_date', how='left')
df_fact_rentals.drop(['rental_date'], axis=1, inplace=True)
df_fact_rentals.head()

Unnamed: 0,fact_rentals_key,rental_id,return_date,amount,payment_date,customer_key,staff_key,film_key,store_key,rental_date_key
0,7322.0,4863.0,2005-07-11 21:29:15,,NaT,431,2,1,1,20050708
1,17163.0,11433.0,2005-08-11 21:35:10,,NaT,518,1,1,1,20050802
2,22102.0,14714.0,2005-08-30 22:26:43,3.99,2005-08-21 21:27:43,279,1,1,1,20050821
3,1492.0,972.0,2005-06-06 00:36:07,,NaT,411,1,1,1,20050530
4,3182.0,2117.0,2005-06-23 17:45:00,0.99,2005-06-17 20:24:00,170,2,1,1,20050617


In [39]:
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rentals.return_date = df_fact_rentals.return_date.astype('datetime64[ns]').dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_return_date, on='return_date', how='left')
df_fact_rentals.drop(['return_date'], axis=1, inplace=True)
df_fact_rentals.head()

Unnamed: 0,fact_rentals_key,rental_id,amount,payment_date,customer_key,staff_key,film_key,store_key,rental_date_key,return_date_key
0,7322.0,4863.0,,NaT,431,2,1,1,20050708,20050711.0
1,17163.0,11433.0,,NaT,518,1,1,1,20050802,20050811.0
2,22102.0,14714.0,3.99,2005-08-21 21:27:43,279,1,1,1,20050821,20050830.0
3,1492.0,972.0,,NaT,411,1,1,1,20050530,20050606.0
4,3182.0,2117.0,0.99,2005-06-17 20:24:00,170,2,1,1,20050617,20050623.0


In [40]:
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_rentals.payment_date = df_fact_rentals.payment_date.astype('datetime64[ns]').dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_payment_date, on='payment_date', how='left')
df_fact_rentals.drop(['payment_date'], axis=1, inplace=True)
df_fact_rentals.head()

Unnamed: 0,fact_rentals_key,rental_id,amount,customer_key,staff_key,film_key,store_key,rental_date_key,return_date_key,payment_date_key
0,7322.0,4863.0,,431,2,1,1,20050708,20050711.0,
1,17163.0,11433.0,,518,1,1,1,20050802,20050811.0,
2,22102.0,14714.0,3.99,279,1,1,1,20050821,20050830.0,20050821.0
3,1492.0,972.0,,411,1,1,1,20050530,20050606.0,
4,3182.0,2117.0,0.99,170,2,1,1,20050617,20050623.0,20050617.0


And that looks pretty good! My only quarrel at this point is that "amount" looks really weird so I'm going to change that to paid_amount, but otherwise it's ready to be shipped off!

In [41]:
df_fact_rentals.rename(columns={"amount": "paid_amount"}, inplace=True)
df_fact_rentals.head()

Unnamed: 0,fact_rentals_key,rental_id,paid_amount,customer_key,staff_key,film_key,store_key,rental_date_key,return_date_key,payment_date_key
0,7322.0,4863.0,,431,2,1,1,20050708,20050711.0,
1,17163.0,11433.0,,518,1,1,1,20050802,20050811.0,
2,22102.0,14714.0,3.99,279,1,1,1,20050821,20050830.0,20050821.0
3,1492.0,972.0,,411,1,1,1,20050530,20050606.0,
4,3182.0,2117.0,0.99,170,2,1,1,20050617,20050623.0,20050617.0


In [42]:
db_operation = "insert"

tables = [('fact_rentals', df_fact_rentals, 'fact_rentals_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(mysql_args['uid'], mysql_args['pwd'], mysql_args['hostname'], mysql_args['dst_db'], dataframe, table_name, primary_key, db_operation)

## Step 5: SQL Query to prove that the fact_rentals table works
In this case, I'll be doing a query that a person may want to know - depending on the staff on duty, does a person prefer to spend more for a staff member? So basically, I'm querying:
1. The fact_rentals table, to get the paid_amount
2. The dim_customers table, to get the names of individuals
3. The dim_staff table, to get the names of the staff 

And so I'll basically be displaying the customer along with the amount of money they have utilized related to the staff member that assisted them. Note that in previous labs we only used last name, but that doesn't seem very accurate to me here since this dataset has a lot of shared last names! Thus, I did a concatenation between the first and last name to make the full customer name!

In [43]:
sql_staff_efficiency = """
    SELECT CONCAT(customers.`first_name`, ' ', customers.`last_name`) AS `Customer Name`,
        SUM(rentals.`paid_amount`) AS `Total Money Spent`,
        CONCAT(staff.`first_name`, ' ', staff.`last_name`) AS `Staff Name`
    FROM `{0}`.`fact_rentals` AS rentals
    INNER JOIN `{0}`.`dim_customers` AS customers
    ON rentals.customer_key = customers.customer_key
    INNER JOIN `{0}`.`dim_staff` AS staff
    ON rentals.staff_key = staff.staff_key
    GROUP BY `Customer Name`, `Staff Name`
    ORDER BY `Customer Name`;
""".format(mysql_args['dst_db'])

In [44]:
df_staff_eff = get_sql_dataframe(sql_staff_efficiency, **mysql_args)
df_staff_eff

Unnamed: 0,Customer Name,Total Money Spent,Staff Name
0,AARON SELBY,22.96,Jon Stephens
1,AARON SELBY,21.95,Mike Hillyer
2,ADAM GOOCH,19.94,Jon Stephens
3,ADAM GOOCH,26.92,Mike Hillyer
4,ADRIAN CLARY,20.96,Jon Stephens
...,...,...,...
1193,YOLANDA WEAVER,21.94,Mike Hillyer
1194,YVONNE WATKINS,25.96,Jon Stephens
1195,YVONNE WATKINS,16.95,Mike Hillyer
1196,ZACHARY HITE,46.91,Jon Stephens


And that looks pretty good! We've got our customer's name, the staff member that helped them, and the comparative amounts right next to each other.

Now, typically in labs we do two queries (ones for extra credit) so I'll just do another here that might demonstrate the capabilities of this OLAP database better:

## Step 5.1: SQL Query #2 :)
The question is, how much money does a customer spend on watching a movie per minute? Let's break this down, we'll need to query:
1. The fact_rentals table again, for the paid_amount
2. The dim_customers table again, for the customer's name
3. The dim_films table, to know the length of a the movies

And so we'll be basically dividing the paid_amount per movie by its respective length and then averaging that.

In [45]:
sql_movie_cost_avg = """
    SELECT CONCAT(customers.`first_name`, ' ', customers.`last_name`) AS `Customer Name`,
        AVG(rentals.`paid_amount` / films.`length`) AS `Average Amount of Money Spent per Minute`
    FROM `{0}`.`fact_rentals` AS rentals
    INNER JOIN `{0}`.`dim_customers` AS customers
    ON rentals.customer_key = customers.customer_key
    INNER JOIN `{0}`.`dim_films` AS films
    ON rentals.film_key = films.film_key
    GROUP BY `Customer Name`
    ORDER BY `Customer Name`;
""".format(mysql_args['dst_db'])

In [46]:
df_movie_cost_avg = get_sql_dataframe(sql_movie_cost_avg, **mysql_args)
df_movie_cost_avg

Unnamed: 0,Customer Name,Average Amount of Money Spent per Minute
0,AARON SELBY,0.052975
1,ADAM GOOCH,0.040776
2,ADRIAN CLARY,0.035849
3,AGNES BISHOP,0.031992
4,ALAN KAHN,0.047803
...,...,...
594,WILLIE MARKHAM,0.041915
595,WILMA RICHARDS,0.042021
596,YOLANDA WEAVER,0.042388
597,YVONNE WATKINS,0.039064


And hey, that's pretty nice! Looks like our customers are only spending a few cents per minute of a movie that they get to enjoy - a shame Blockbuster went out of business.