### Define Connection Variables and Functions

In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

#### Define Variables for MySQL Server & MongoDB Server

In [2]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Rosie3402"

src_dbname = "sakila"
dst_dbname = "sakila_db"


mysql_args = {
    "uid" : "root",
    "pwd" : "Rosie3402",
    "hostname" : "localhost",
    "user_id": "root",
    "dbname" : "sakila"
}

# Use when running test queries at the end of the notebook
mysql_args_test = {
    "uid" : "root",
    "pwd" : "Rosie3402",
    "hostname" : "localhost",
    "user_id": "root",
    "dbname" : "sakila_db" # Destination database 
}


# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "kelseymatsik",
    "password" : "Rosie3402",
    "cluster_name" : "DevCluster",
    "cluster_subnet" : "ed41k",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila"
}

#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

# def set_dataframe(df, table_name, pk_column, db_operation, **args):
#     '''Create a connection to the MySQL database'''
#     conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}" 
#     sqlEngine = create_engine(conn_str, pool_recycle=3600)
#     connection = sqlEngine.connect()
    
#     # K: Disable foreign key checks 
#     connection.execute("SET foreign_key_checks = 0;")
    
#     '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
#     if db_operation == "insert":
#         df.to_sql(table_name, con=connection, index=False, if_exists='replace')
#         connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
#     elif db_operation == "update":
#         df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
#     connection.close()
def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
        
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Create Destination Data Warehouse (Database)

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;"))
connection.execute(text(f"USE {dst_dbname};"))

connection.close()

### Get Dimension Tables 

#### Extract Data from Local Files 

In [5]:
data_dir = os.path.join(os.getcwd(), "local_files")

data_file_actor = os.path.join(data_dir, 'actor.csv')

df_actor = pd.read_csv(data_file_actor, header=0)
df_actor.head(2)

Unnamed: 0,actor_id,first_name,last_name,last_update
0,1,PENELOPE,GUINESS,2006-02-15 04:34:33
1,2,NICK,WAHLBERG,2006-02-15 04:34:33


In [6]:
data_file_address = os.path.join(data_dir, 'address.csv')

df_address = pd.read_csv(data_file_address, header=0)
df_address.head(2)

Unnamed: 0,address_id,address,address2,district,city_id,postal_code,phone,location,last_update
0,1,47 MySakila Drive,,Alberta,300,,,...,2014-09-25 22:30:27
1,2,28 MySQL Boulevard,,QLD,576,,,...,2014-09-25 22:30:09


#### Extract Data from MongoDB

First, populate MongoDB with source data.       

In [None]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then appends the 'data' directory
data_dir = os.path.join(os.getcwd(), "mongodb_files")

json_files = {"category": "category.json", 
              "city": "city.json"}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)   

Then, retrieve data from MongoDB. 

In [18]:
# Category 
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "category"

df_category = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_category.head(2)

Unnamed: 0,category_id,name,last_update
0,1,Action,2006-02-15 04:46:27
1,2,Animation,2006-02-15 04:46:27


In [19]:
# City
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "city"

df_city = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_city.head(2)

Unnamed: 0,city_id,city,country_id,last_update
0,1,A Coruña (La Coruña),87,2006-02-15 04:45:25
1,2,Abha,82,2006-02-15 04:45:25


#### Extract Data from Source Database (SQL Workbench) 

In [7]:
sql_country = "SELECT * FROM sakila.country;"
df_country = get_sql_dataframe(sql_country, **mysql_args)
df_country.head(2)

Unnamed: 0,country_id,country,last_update
0,1,Afghanistan,2006-02-15 04:44:00
1,2,Algeria,2006-02-15 04:44:00


In [8]:
sql_customer = "SELECT * FROM sakila.customer;"
df_customer = get_sql_dataframe(sql_customer, **mysql_args)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [9]:
sql_film = "SELECT * FROM sakila.film;"
df_film = get_sql_dataframe(sql_film, **mysql_args)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [10]:
sql_film_actor = "SELECT * FROM sakila.film_actor;"
df_film_actor = get_sql_dataframe(sql_film_actor, **mysql_args)
df_film_actor.head(2)

Unnamed: 0,actor_id,film_id,last_update
0,1,1,2006-02-15 05:05:03
1,1,23,2006-02-15 05:05:03


In [11]:
sql_film_category = "SELECT * FROM sakila.film_category;"
df_film_category = get_sql_dataframe(sql_film_category, **mysql_args)
df_film_category.head(2)

Unnamed: 0,film_id,category_id,last_update
0,1,6,2006-02-15 05:07:09
1,2,11,2006-02-15 05:07:09


In [12]:
sql_staff = "SELECT * FROM sakila.staff;"
df_staff = get_sql_dataframe(sql_staff, **mysql_args)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


In [None]:
## EXTRA
# sql_film_text = "SELECT * FROM sakila.film_text;"
# df_film_text = get_sql_dataframe(sql_film_text, **mysql_args)
# df_film_text.head(2)

In [None]:
## EXTRA
# sql_language = "SELECT * FROM sakila.language;"
# df_language = get_sql_dataframe(sql_language, **mysql_args)
# df_language.head(2)

In [None]:
## EXTRA 
## Contains redundant / unnecessary info that 'staff' table already has
# sql_store = "SELECT * FROM sakila.store;"
# df_store = get_sql_dataframe(sql_store, **mysql_args)
# df_store.head(2)

### Create the Date Dimension Table
Execute the SQL Script from Lab2c that creates and populates a Date Dimension table. 

### Perform Neccessary Transformations 

#### Check for Missing Values

In [None]:
df_actor.isna().sum() / len(df_actor)

In [None]:
df_address.isna().sum() / len(df_address)

# Address2 has some missing values, but this makes sense because not all 
# addresses will have a second address line 

In [None]:
df_category.isna().sum() / len(df_category)

In [None]:
df_city.isna().sum() / len(df_city)

In [None]:
df_country.isna().sum() / len(df_country)

In [None]:
df_customer.isna().sum() / len(df_customer)

In [20]:
print(df_film.isna().sum() / len(df_film))

# Inspect missing values
df_film.head(2)

# Original_language_id has 100% missing values, so we'll drop this column 
df_film.drop(["original_language_id"], axis = 1, inplace = True)

film_id                 0.0
title                   0.0
description             0.0
release_year            0.0
language_id             0.0
original_language_id    1.0
rental_duration         0.0
rental_rate             0.0
length                  0.0
replacement_cost        0.0
rating                  0.0
special_features        0.0
last_update             0.0
dtype: float64


In [21]:
df_film_actor.isna().sum() / len(df_film_actor)

actor_id       0.0
film_id        0.0
last_update    0.0
dtype: float64

In [22]:
df_film_category.isna().sum() / len(df_film_category)

film_id        0.0
category_id    0.0
last_update    0.0
dtype: float64

In [23]:
df_staff.isna().sum() / len(df_staff)
# Picture has 50% missing value; we'll drop it below

staff_id       0.0
first_name     0.0
last_name      0.0
address_id     0.0
picture        0.5
email          0.0
store_id       0.0
active         0.0
username       0.0
password       0.5
last_update    0.0
dtype: float64

#### Drop Any Unncessary Columns 

In [24]:
df_actor.drop(["last_update"], axis=1, inplace=True)
df_actor.head(2)

Unnamed: 0,actor_id,first_name,last_name
0,1,PENELOPE,GUINESS
1,2,NICK,WAHLBERG


In [25]:
df_address[["phone", "location", "postal_code"]].head(2)
df_address.drop(["phone", "location", "postal_code", "last_update"], axis=1, inplace=True)
df_address.head(2)

Unnamed: 0,address_id,address,address2,district,city_id
0,1,47 MySakila Drive,,Alberta,300
1,2,28 MySQL Boulevard,,QLD,576


In [26]:
df_category.drop(["last_update"], axis=1, inplace=True)
df_category.head(2)

Unnamed: 0,category_id,name
0,1,Action
1,2,Animation


In [27]:
df_city.drop(["last_update"], axis=1, inplace=True)
df_city.head(2)

Unnamed: 0,city_id,city,country_id
0,1,A Coruña (La Coruña),87
1,2,Abha,82


In [28]:
df_country.drop(["last_update"], axis=1, inplace=True)
df_country.head(2)

Unnamed: 0,country_id,country
0,1,Afghanistan
1,2,Algeria


In [29]:
df_customer.drop(["create_date", "last_update"], axis=1, inplace=True)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1


In [30]:
df_film.drop(["last_update"], axis=1, inplace=True)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"


In [31]:
df_film_actor.drop(["last_update"], axis=1, inplace=True)
df_film_actor.head(2)

Unnamed: 0,actor_id,film_id
0,1,1
1,1,23


In [32]:
df_film_category.drop(["last_update"], axis=1, inplace=True)
df_film_category.head(2)

Unnamed: 0,film_id,category_id
0,1,6
1,2,11


In [33]:
# df_staff.drop(["last_update", "picture"], axis=1, inplace=True)

# Reorder columns 
col_order = ['staff_id', 'store_id', 'address_id', 'first_name', 'last_name', 'email',
        'active', 'username', 'password']
df_staff = df_staff[col_order]

df_staff.head(2)

Unnamed: 0,staff_id,store_id,address_id,first_name,last_name,email,active,username,password
0,1,1,3,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike,8cb2237d0679ca88db6464eac60da96345513964
1,2,2,4,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,Jon,


#### Combine address, city, and country tables

In [34]:
# Merge address and city tables 
df_address = pd.merge(df_address, df_city, on="city_id", how="inner")
df_address.head(2)

Unnamed: 0,address_id,address,address2,district,city_id,city,country_id
0,1,47 MySakila Drive,,Alberta,300,Lethbridge,20
1,2,28 MySQL Boulevard,,QLD,576,Woodridge,8


In [35]:
df_address = pd.merge(df_address, df_country, on="country_id", how="inner")
df_address.head(2)

Unnamed: 0,address_id,address,address2,district,city_id,city,country_id,country
0,1,47 MySakila Drive,,Alberta,300,Lethbridge,20,Canada
1,2,28 MySQL Boulevard,,QLD,576,Woodridge,8,Australia


In [36]:
# Reorder columns with primary and foreign keys first 

print(df_address.columns)
col_order = ['address_id', 'city_id', 'country_id', 'address', 'address2', 'district', 'city', 'country']
df_address = df_address[col_order]
df_address.head(2)

Index(['address_id', 'address', 'address2', 'district', 'city_id', 'city',
       'country_id', 'country'],
      dtype='object')


Unnamed: 0,address_id,city_id,country_id,address,address2,district,city,country
0,1,300,20,47 MySakila Drive,,Alberta,Lethbridge,Canada
1,2,576,8,28 MySQL Boulevard,,QLD,Woodridge,Australia


#### Combine film, film_actor, and film_category tables 

In [37]:
# Merge film and film_actor tables 
df_film = pd.merge(df_film, df_film_actor, on="film_id", how="inner")
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,actor_id
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",1
1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",10


In [38]:
# Merge film and film_category tables 
df_film = pd.merge(df_film, df_film_category, on="film_id", how="inner")

# Reorder columns 
col_order = ['film_id', 'actor_id', 'category_id', 'title', 'description', 'release_year', 'language_id',
       'rental_duration', 'rental_rate', 'length', 'replacement_cost',
       'rating', 'special_features']
df_film = df_film[col_order]

df_film.head(2)

Unnamed: 0,film_id,actor_id,category_id,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,1,1,6,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
1,1,10,6,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"


In [39]:
# Drop duplicates 
df_film = df_film.drop_duplicates(subset='film_id', keep='first')

### Load Dimension Tables into Destination Database (sakila_db)

In [40]:
db_operation = "insert"

tables = [('dim_actor', df_actor, 'actor_id'), 
          ('dim_address', df_address, 'address_id'),
          ('dim_category', df_category, 'category_id'), 
          ('dim_customer', df_customer, 'customer_id'),
          ('dim_film', df_film, 'film_id'), 
          ('dim_staff', df_staff, 'staff_id')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### Create and Populate the Fact Tables 

In the sakila database, **rental**, **payment**, and **inventory** are our fact tables because they contain a primary key and several foreign keys which link to other table's primary keys. 

**e.g. Primary and Foreign Keys**
* In the rental table: 
    * rental_id is the primary key.
    * inventory_id, customer_id, and staff_id are all foreign keys which link to the primary keys in the inventory, customer, and staff tables. 

#### Get Source Data for Fact Tables

In [41]:
sql_inventory = "SELECT * FROM sakila.inventory;"
df_inventory = get_sql_dataframe(sql_inventory, **mysql_args)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


In [42]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_sql_dataframe(sql_payment, **mysql_args)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


In [43]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_sql_dataframe(sql_rental, **mysql_args)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


#### Check Dimension Tables for Missing Values

In [44]:
df_inventory.isna().sum() / len(df_inventory)

inventory_id    0.0
film_id         0.0
store_id        0.0
last_update     0.0
dtype: float64

In [45]:
df_payment.isna().sum() / len(df_payment)

payment_id      0.0
customer_id     0.0
staff_id        0.0
rental_id       0.0
amount          0.0
payment_date    0.0
last_update     0.0
dtype: float64

In [46]:
df_rental.isna().sum() / len(df_rental)

# 1% of return_date values are missing, but this isn't an issue

rental_id       0.000000
rental_date     0.000000
inventory_id    0.000000
customer_id     0.000000
return_date     0.011406
staff_id        0.000000
last_update     0.000000
dtype: float64

#### Drop Unneccessary Columns

In [47]:
df_rental.drop(["last_update"], axis=1, inplace=True)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1


In [48]:
df_inventory.drop(["last_update"], axis=1, inplace=True)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id
0,1,1,1
1,2,1,1


In [49]:
df_payment.drop(["last_update"], axis=1, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date
0,1,1,1,76,2.99,2005-05-25 11:30:37
1,2,1,1,573,0.99,2005-05-28 10:35:23


####  Convert dates to datetime format 

In [50]:
df_rental.rental_date = df_rental.rental_date.astype('datetime64[ns]').dt.date
df_rental.return_date = df_rental.return_date.astype('datetime64[ns]').dt.date
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id
0,1,2005-05-24,367,130,2005-05-26,1
1,2,2005-05-24,1525,459,2005-05-28,1


In [51]:
df_payment.payment_date = df_payment.payment_date.astype('datetime64[ns]').dt.date
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date
0,1,1,1,76,2.99,2005-05-25
1,2,1,1,573,0.99,2005-05-28


#### Combine rental and inventory tables

In [52]:
# Merge rental and inventory columns 
df_rental = pd.merge(df_rental, df_inventory, on="inventory_id", how="inner")

# Reorder columns to put primary and foreign keys first 
col_order = ['rental_id', 'inventory_id',  'film_id', 'customer_id', 'rental_date',
       'return_date']
df_rental = df_rental[col_order]

df_rental.head(2)

Unnamed: 0,rental_id,inventory_id,film_id,customer_id,rental_date,return_date
0,1,367,80,130,2005-05-24,2005-05-26
1,2,1525,333,459,2005-05-24,2005-05-28


### Get the Date Dimension Table as a Dataframe

In [53]:
sql_date = "SELECT * FROM sakila.dim_date;"
df_date = get_sql_dataframe(sql_date, **mysql_args)
df_date.head(2)

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


#### Perform Transformations to the date dimension table

In [54]:
# Drop unnecessary columns
df_date.drop(['is_last_day_of_month', 'calendar_quarter', 'calendar_year',
       'calendar_year_month', 'calendar_year_qtr', 'fiscal_month_of_year', 
              'fiscal_year', 'fiscal_year_month', 'week_of_year'], 
            axis=1, inplace=True)

# Reorder columns 
col_order = ['date_key', 'full_date', 'date_name', 'date_name_us', 'date_name_eu',
       'day_of_week', 'day_name_of_week', 'day_of_month',  'month_name', 'month_of_year', 
       'day_of_year','weekday_weekend', 'fiscal_quarter', 'fiscal_year_qtr']

df_date = df_date[col_order]
df_date.head(2)

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,month_name,month_of_year,day_of_year,weekday_weekend,fiscal_quarter,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,January,1,1,Weekend,3,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,January,1,2,Weekend,3,2000Q3


#### Combine payment and date dimension table

In [55]:
# Rename full_date column 
df_date = df_date.rename(columns={"full_date": "payment_date"})
# df_date.head(2)
# Merge payment and date tables 
df_payment = pd.merge(df_payment, df_date, on="payment_date", how="left")
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,date_key,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,month_name,month_of_year,day_of_year,weekday_weekend,fiscal_quarter,fiscal_year_qtr
0,1,1,1,76,2.99,2005-05-25,20050525,2005/05/25,05/25/2005,25/05/2005,4,Wednesday,25,May,5,145,Weekday,4,2005Q4
1,2,1,1,573,0.99,2005-05-28,20050528,2005/05/28,05/28/2005,28/05/2005,7,Saturday,28,May,5,148,Weekend,4,2005Q4


### Load Fact Tables into Destination Database (sakila_db)

In [56]:
db_operation = "insert"

tables = [('fact_rental', df_rental, 'rental_id'), 
          ('fact_payment', df_payment, 'payment_id')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

## Test SQL Queries to Demonstrate Functionality

In [57]:
sql_customer = "SELECT * FROM sakila_db.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_customer, **mysql_args)
df_dim_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1


In [58]:
sql_address = "SELECT * FROM sakila_db.dim_address;"
df_dim_address = get_sql_dataframe(sql_address, **mysql_args)
df_dim_address.head(2)

Unnamed: 0,address_id,city_id,country_id,address,address2,district,city,country
0,1,300,20,47 MySakila Drive,,Alberta,Lethbridge,Canada
1,2,576,8,28 MySQL Boulevard,,QLD,Woodridge,Australia


In [59]:
sql_payment = "SELECT * FROM sakila_db.fact_payment;"
df_fact_payment = get_sql_dataframe(sql_payment, **mysql_args)
df_fact_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,date_key,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,month_name,month_of_year,day_of_year,weekday_weekend,fiscal_quarter,fiscal_year_qtr
0,1,1,1,76,2.99,2005-05-25,20050525,2005/05/25,05/25/2005,25/05/2005,4,Wednesday,25,May,5,145,Weekday,4,2005Q4
1,2,1,1,573,0.99,2005-05-28,20050528,2005/05/28,05/28/2005,28/05/2005,7,Saturday,28,May,5,148,Weekend,4,2005Q4


In [61]:
# Gets the date of the 10 most recent movie rentals: full name of customers 
# who rented them, their addresses, the payment/rental date, and the amount  

query1 = """SELECT
                fp.payment_date, 
                fp.amount, 
                CONCAT(dc.first_name, ' ', dc.last_name) AS full_name, 
                da.address
            FROM 
                fact_payment AS fp
            JOIN 
                dim_customer AS dc ON fp.customer_id = dc.customer_id
            JOIN 
                dim_address AS da ON dc.address_id = da.address_id
            ORDER BY 
                fp.payment_date DESC
            LIMIT 10;"""

query1_result = get_sql_dataframe(query1, **mysql_args_test)
query1_result

Unnamed: 0,payment_date,amount,full_name,address
0,2006-02-14,3.98,MIGUEL BETANCOURT,319 Springs Loop
1,2006-02-14,0.0,MIGUEL BETANCOURT,319 Springs Loop
2,2006-02-14,0.99,JAY ROBB,1947 Paarl Way
3,2006-02-14,4.99,TOM MILNER,535 Ahmadnagar Manor
4,2006-02-14,3.98,BILL GAVIN,1485 Bratislava Place
5,2006-02-14,0.0,BILL GAVIN,1485 Bratislava Place
6,2006-02-14,4.99,GREG ROBINS,1786 Salinas Place
7,2006-02-14,0.99,DUSTIN GILLETTE,1854 Okara Boulevard
8,2006-02-14,0.99,DERRICK BOURQUE,1153 Allende Way
9,2006-02-14,0.99,ZACHARY HITE,98 Pyongyang Boulevard


In [62]:
# Gets the 10 most popular movies (movies rented the most times) 
# and the names of the actors that star in them 

query2 = """
    SELECT
        df.title, 
        CONCAT(da.first_name, ' ', da.last_name) AS actor_name, 
        COUNT(fr.film_id) AS rental_count
    FROM 
        fact_rental AS fr
    JOIN 
        dim_film AS df ON fr.film_id = df.film_id
    JOIN 
        dim_actor AS da ON df.actor_id = da.actor_id
    GROUP BY 
        df.title, da.first_name, da.last_name
    ORDER BY 
        rental_count DESC
    LIMIT 10;
"""

query2_result = get_sql_dataframe(query2, **mysql_args_test)
query2_result

Unnamed: 0,title,actor_name,rental_count
0,BUCKET BROTHERHOOD,RIP CRAWFORD,34
1,ROCKETEER MOTHER,JUDY DEAN,33
2,RIDGEMONT SUBMARINE,JOHNNY LOLLOBRIGIDA,32
3,JUGGLER HARDLY,MILLA PECK,32
4,FORWARD TEMPLE,RIP WINSLET,32
5,SCALAWAG DUCK,REESE KILMER,32
6,GRIT CLOCKWORK,CAMERON ZELLWEGER,32
7,APACHE DIVINE,NICK WAHLBERG,31
8,NETWORK PEAK,KIRK JOVOVICH,31
9,WIFE TURN,WOODY HOFFMAN,31
