# DS-2002 – Data Project 1

## Jason Giroux (jag8thv)

During this project I used the Sakila sample database for MySQL.
<!--
- actor: actor_id, first_name, last_name, last_update
- address: address_id, address, address2, district, city_id, postal_code, phone, location, last_update
  - has relationship through city_id

- category: category_id, name, last_update
- city: city_id, city, country_id
  - has relationship thru country_id

- country: country_id, country, last_update
- customer: customer_id, store_id, first_name, last_name, email, address_id, active, create_date, last_update
  - has relationship to store_id, address_id

- film: film_id, title, description, release_year, language_id, original_language_id (pretty null), rental_duration, rental_rate, length, replacement_cost, rating, special_features, last_update
  - has language_id, original_language_id
- film_actor: actor_id, film_id, last_update
- film_category: film_id, category_id, last_update
- film_text: film_id, title, description
- inventory: inventory_id, film_id, store_id, last_update
  - has film_id, store_id
- language: language_id, name, last_update
- payment: payment_id, customer_id, staff_id, rental_id, amount, payment_date, last_update
  - customer_id, staff_id, rental_id
- rental: rental_id, rental_date, inventory_id, customer_id, return_date, staff_id, last_update
  - inventory_id, customer_id, staff_id
- staff: staff_id, first_name, last_name, address_id, picture, email, store_id, active, username, password, last_update
  - address_id, store_id
- store: store_id, manager_staff_id, address_id, last_update
-->

### Tables

- fact_rental: from rental
- dim_staff: from staff, address (local file), city, country
- dim_customer: from customer (Mongo data), address (local file), city, country

### Submission Includes: 
- sakila-schema.sql: DDL for Sakila DB
- sakila-data.sql: Data for Sakila DB
- sakila_dw.sql: Creates new Sakila DW DB
- customer.json: Customer table saved as a local JSON file using MySQL export wizard. Later uploaded to MongoDB
- address.json: Address table saved as a JSON file on local computer. Used MySQL export wizard.
- sakila-date-dim.sql: Creates new date dimension table
- mongo-upload.ipynb: Uploads customer data to MongoDB
- Project 1.ipynb: main script with project contents

### Steps
1. I first created the Sakila database by executing the attached sakila-schema.sql and sakila-data.sql files 
1. I exported data from MySQL to be used as local file data and MongoDB data
    - The customer table was exported into customer.json
    - The address table was exported into address.json
1. I ran my MongoDB upload script in mongo-upload.ipynb to upload the customer JSON data into MongoDB
1. I used my sakila_dw.sql script to create the sakila_dw database
1. I extracted, transformed, and loaded data into sakila_dw from Sakila, MongoDB, and my local file system

### Creating the dim_date Table

I ran sakila-date-dim.sql to create a dim_date dimension table within the sakila_dw database

### Creating the fact_rental Table

I first extracted the rental table from sakila to create the fact_rental table

I imported the required libraries and defined connection constants

In [1]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine

In [2]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "iHaveTheBase"

src_mysql_dbname = "sakila"
dst_mysql_dbname = "sakila_dw"

I also created helper functions for getting and setting SQL data

In [3]:
def get_set_sql_dataframe(user_id, pwd, host_name):
    def get_sql_dataframe(db_name, sql_query):
        try:
            conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
            sqlEngine = create_engine(conn_str, pool_recycle=3600)
            connection = sqlEngine.connect()
            dframe = pd.read_sql(sql_query, connection);
        except Exception as e:
            print(e)
        finally: 
            connection.close()
        return dframe


    def set_sql_dataframe(db_name, df, table_name, pk_column, db_operation):
        try:
            conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
            sqlEngine = create_engine(conn_str, pool_recycle=3600)
            connection = sqlEngine.connect()

            if db_operation == "insert":
                df.to_sql(table_name, con=connection, index=False, if_exists='replace')
                sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")

            elif db_operation == "update":
                df.to_sql(table_name, con=connection, index=False, if_exists='append')
        except Exception as e:
            print(e)
        finally:
            connection.close()
    return get_sql_dataframe, set_sql_dataframe

get_sql_dataframe, set_sql_dataframe = get_set_sql_dataframe(user_id, pwd, host_name)

I then extracted, transformed, and loaded the rental data into the sakila_dw database

In [4]:
df_rental = get_sql_dataframe(db_name=src_mysql_dbname, sql_query="SELECT * FROM sakila.rental;")
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [5]:
# drop inventory info, we won't use it
df_rental.drop(columns=["inventory_id"], inplace=True)
# rename id's to keys
df_rental.rename(columns={
    "rental_id": "rental_key", 
    "customer_id": "customer_key", 
    "staff_id": "staff_key",
}, inplace=True)
# insert our fact table key column
df_rental.insert(0, "fact_rental_key", range(1, df_rental.shape[0]+1))
df_rental.head(2)

Unnamed: 0,fact_rental_key,rental_key,rental_date,customer_key,return_date,staff_key,last_update
0,1,1,2005-05-24 22:53:30,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2,2005-05-24 22:54:33,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [6]:
# Insert the newly transformed fact_rentals data into sakila_dw
set_sql_dataframe(
    db_name="sakila_dw", 
    df=df_rental, 
    table_name="fact_rentals", 
    pk_column="fact_rental_key", 
    db_operation="insert"
)

### Extract the Customer, Staff, Address, City, and Country Tables

<!-- I will then load all the tables required for the dim_customer and dim_staff tables (customer, address, city, country, staff) -->

I imported the required libraries, defined constants and a helper function, and then extracted the customers data from MongoDB

In [7]:
import json
import pymongo

In [8]:
atlas_cluster_name = "sandbox.9yy6uzw"
atlas_user_name = "m001-student"
atlas_password = "m001-mongodb-basics"

conn_str = f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
src_mongo_dbname = "sakila_customers"

print(f"Atlas Connection String: {conn_str}")

Atlas Connection String: mongodb+srv://m001-student:m001-mongodb-basics@sandbox.9yy6uzw.mongodb.net


In [9]:
def get_mongo_dataframe(connect_str, db_name, collection, query):
    try:
        '''Create a connection to MongoDB'''
        client = pymongo.MongoClient(connect_str)

        '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
        db = client[db_name]
        dframe = pd.DataFrame(list(db[collection].find(query)))
        dframe.drop(['_id'], axis=1, inplace=True)
    except Exception as e:
        print(e)
    finally:
        client.close()
    return dframe

In [10]:
df_customer = get_mongo_dataframe(
    connect_str=conn_str, 
    db_name=src_mongo_dbname, 
    collection="customers", 
    query={}
)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


I then extracted the staff table from the Sakila database

In [11]:
# staff from sql
df_staff = get_sql_dataframe(db_name=src_mysql_dbname, sql_query="SELECT * FROM sakila.staff;")
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


I also retrieved the address table from my local filesystem

In [12]:
# address (local file)
json_file_location = os.path.join( os.getcwd(), 'address.json' )
try:
    df_address = pd.read_json(json_file_location)
    df_address.head(2)
except Exception as e:
    print(e)

I then extracted the city and country tables from the Sakila database

In [13]:
# city from sql
df_city = get_sql_dataframe(db_name=src_mysql_dbname, sql_query="SELECT * FROM sakila.city;")
df_city.head(2)

Unnamed: 0,city_id,city,country_id,last_update
0,1,A Coruña (La Coruña),87,2006-02-15 04:45:25
1,2,Abha,82,2006-02-15 04:45:25


In [14]:
# country from sql
df_country = get_sql_dataframe(db_name=src_mysql_dbname, sql_query="SELECT * FROM sakila.country;")
df_country.head(2)

Unnamed: 0,country_id,country,last_update
0,1,Afghanistan,2006-02-15 04:44:00
1,2,Algeria,2006-02-15 04:44:00


### Transforming and Loading the Customer Table
I transformed the customer dimension table by merging it with the address, city, and country tables

In [15]:
# merge the customer, city, and country tables
df_dim_customer = df_customer.merge(
    right=df_address, 
    how="left", 
    on="address_id", 
    suffixes=("_customer", "_address")
).merge(
    right=df_city,
    how="left",
    on="city_id",
).merge(
    right=df_country,
    how="left",
    on="country_id",
    suffixes=("_city", "_country")
)
# drop extraneous id columns and BLOB data
df_dim_customer.drop(columns=[
    "store_id", 
    "address_id", 
    "city_id", 
    "country_id", 
    "location", 
], inplace=True)
# Rename id column to key
df_dim_customer.rename(columns={ "customer_id": "customer_key" }, inplace=True)

df_dim_customer.head(2)

Unnamed: 0,customer_key,first_name,last_name,email,active,create_date,last_update_customer,address,address2,district,postal_code,phone,last_update_address,city,last_update_city,country,last_update_country
0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-15 04:57:20,1913 Hanoi Way,,Nagasaki,35200,28303384290,2014-09-25 22:31:53,Sasebo,2006-02-15 04:45:25,Japan,2006-02-15 04:44:00
1,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-15 04:57:20,1121 Loja Avenue,,California,17886,838635286649,2014-09-25 22:34:01,San Bernardino,2006-02-15 04:45:25,United States,2006-02-15 04:44:00


In [16]:
# load the transformed customer dim table into sakila_dw
set_sql_dataframe(
    db_name="sakila_dw", 
    df=df_dim_customer, 
    table_name="dim_customer", 
    pk_column="customer_key", 
    db_operation="insert"
)

### Transforming and Loading the Staff Table

I also transformed the staff table by merging it with the address, city, and country tables

In [17]:
# merge the staff, address, city, and country tables
df_dim_staff = df_staff.merge(
    right=df_address, 
    how="left", 
    on="address_id", 
    suffixes=("_staff", "_address")
).merge(
    right=df_city,
    how="left",
    on="city_id",
).merge(
    right=df_country,
    how="left",
    on="country_id",
    suffixes=("_city", "_country")
)
# drop extraneous id columns and BLOB data
df_dim_staff.drop(columns=[
    "store_id", 
    "address_id", 
    "city_id", 
    "country_id", 
    "picture",
    "password",
    "location", 
], inplace=True)
# Rename id column to key
df_dim_staff.rename(columns={ "staff_id": "staff_key" }, inplace=True)

df_dim_staff.head(2)

Unnamed: 0,staff_key,first_name,last_name,email,active,username,last_update_staff,address,address2,district,postal_code,phone,last_update_address,city,last_update_city,country,last_update_country
0,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike,2006-02-15 03:57:16,23 Workhaven Lane,,Alberta,,14033335568,2014-09-25 22:30:27,Lethbridge,2006-02-15 04:45:25,Canada,2006-02-15 04:44:00
1,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,Jon,2006-02-15 03:57:16,1411 Lillydale Drive,,QLD,,6172235589,2014-09-25 22:30:09,Woodridge,2006-02-15 04:45:25,Australia,2006-02-15 04:44:00


In [18]:
# load the transformed staff dim table into sakila_dw
set_sql_dataframe(
    db_name="sakila_dw", 
    df=df_dim_staff, 
    table_name="dim_staff", 
    pk_column="staff_key", 
    db_operation="insert"
)

### Demonstrating Proper Functionality

The attached SQL query counts the number of active customers that each staff member has helped with any rentals

In [19]:
sql_aggregation = """
SELECT DS.first_name AS 'first name'
    , DS.last_name AS 'last name'
    , SUM(DC.active) AS 'number of active customers helped'
FROM sakila_dw.dim_customer AS DC
    INNER JOIN sakila_dw.fact_rentals AS FR
        ON DC.customer_key = FR.customer_key
    INNER JOIN sakila_dw.dim_staff AS DS
        ON FR.staff_key = DS.staff_key
GROUP BY DS.first_name, DS.last_name;
"""
df_aggregation = get_sql_dataframe(db_name=src_mysql_dbname, sql_query=sql_aggregation)
df_aggregation.head(2)

Unnamed: 0,first name,last name,number of active customers helped
0,Mike,Hillyer,7855.0
1,Jon,Stephens,7785.0
