In [9]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [11]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.30
Running PyMongo Version: 4.10.1


In [13]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Password123"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

mysql_args = {
    "uid" : "root",
    "pwd" : "Password123",
    "hostname" : "localhost",
    "dbname" : "sakila_dw"
}
mongodb_args = {
    "user_name" : "aen6ju",
    "password" : "Password123",
    "cluster_name" : "Cluster0SB",
    "cluster_subnet" : "od5pn",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila_dw2"
}

In [15]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

In [17]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;"))
connection.execute(text(f"USE {dst_dbname};"))

connection.close()

### 1.0. Create and Populate the New Dimension Tables

In [22]:
sql_customers = "SELECT * FROM sakila.customer;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(5)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [24]:
json_file_path = 'data/customer_data.json'
df_customers.to_json(json_file_path, orient='records')
print("Customer data exported to JSON.")

Customer data exported to JSON.


In [26]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)
df_payment.head(5)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30
2,3,1,1,1185,5.99,2005-06-15 00:54:12,2006-02-15 22:12:30
3,4,1,2,1422,0.99,2005-06-15 18:02:53,2006-02-15 22:12:30
4,5,1,2,1476,9.99,2005-06-15 21:08:46,2006-02-15 22:12:30


In [28]:
json_file_path = 'data/payment_data.json'
df_payment.to_json(json_file_path, orient='records')
print("Payment data exported to JSON.")

Payment data exported to JSON.


In [34]:
sql_staff = """
    SELECT staff_id, first_name, last_name, address_id, email, store_id, active, username, last_update
    FROM sakila.staff;
"""
df_staff = get_dataframe(user_id, pwd, host_name, src_dbname, sql_staff)
df_staff.head(5)

Unnamed: 0,staff_id,first_name,last_name,address_id,email,store_id,active,username,last_update
0,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,2006-02-15 03:57:16
1,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,2006-02-15 03:57:16


In [36]:
json_file_path = 'data/staff_data.json'
df_staff.to_json(json_file_path, orient='records')
print("Staff data exported to JSON.")

Staff data exported to JSON.


In [112]:
sql_store = "SELECT * FROM sakila.store;"
df_store = get_dataframe(user_id, pwd, host_name, src_dbname, sql_store)
df_store.head(5)

Unnamed: 0,store_id,manager_staff_id,address_id,last_update
0,1,1,1,2006-02-15 04:57:12
1,2,2,2,2006-02-15 04:57:12


In [114]:
json_file_path = 'data/store_data.json'
df_store.to_json(json_file_path, orient='records')
print("Store data exported to JSON.")

Store data exported to JSON.


### Creating new fact table

In [116]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
import pandas as pd



data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"customers" : 'customer_data.json',
              "payments" : 'payment_data.json',
              "staff" : 'staff_data.json',
              "store" : 'store_data.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)         

In [117]:
client = get_mongo_client(**mongodb_args)

query = {}
collection = "customers"

df_customers = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customers.head(5)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,1139954676000,1139979440000
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,1139954676000,1139979440000
2,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,1139954676000,1139979440000
3,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,1139954676000,1139979440000
4,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,1139954676000,1139979440000


In [120]:
client = get_mongo_client(**mongodb_args)

query = {}
collection = "payments"

df_payments = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_payments.head(5)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,1117020637000,1140041550000
1,2,1,1,573,0.99,1117276523000,1140041550000
2,3,1,1,1185,5.99,1118796852000,1140041550000
3,4,1,2,1422,0.99,1118858573000,1140041550000
4,5,1,2,1476,9.99,1118869726000,1140041550000


In [122]:
client = get_mongo_client(**mongodb_args)

query = {}
collection = "staff"

df_staff = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_staff.head(5)

Unnamed: 0,staff_id,first_name,last_name,address_id,email,store_id,active,username,last_update
0,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,1139975836000
1,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,1139975836000


In [124]:
client = get_mongo_client(**mongodb_args)

query = {}
collection = "store"

df_store = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_store.head(5)

Unnamed: 0,store_id,manager_staff_id,address_id,last_update
0,1,1,1,1139979432000
1,2,2,2,1139979432000


In [None]:
Creating dim tables

In [150]:
df_customers['customer_key'] = range(1, len(df_customers) + 1)
set_dataframe(df_customers, 'dim_customer', 'customer_id', 'insert', **mysql_args)


In [128]:
sql_payment_dates = """
    SELECT DISTINCT payment_date
    FROM sakila.payment;
"""

df_dates = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment_dates)

df_dates['payment_date'] = pd.to_datetime(df_dates['payment_date'])

df_dates['year'] = df_dates['payment_date'].dt.year
df_dates['month'] = df_dates['payment_date'].dt.month
df_dates['day'] = df_dates['payment_date'].dt.day
df_dates['hour'] = df_dates['payment_date'].dt.hour
df_dates['minute'] = df_dates['payment_date'].dt.minute
df_dates['second'] = df_dates['payment_date'].dt.second

df_dates['date_key'] = range(1, len(df_dates) + 1)

df_dates = df_dates.drop(columns=['payment_date'])
df_dates['date'] = pd.to_datetime(df_dates[['year', 'month', 'day', 'hour', 'minute', 'second']])

df_dates.head(5)


Unnamed: 0,year,month,day,hour,minute,second,date_key,date
0,2005,5,25,11,30,37,1,2005-05-25 11:30:37
1,2005,5,28,10,35,23,2,2005-05-28 10:35:23
2,2005,6,15,0,54,12,3,2005-06-15 00:54:12
3,2005,6,15,18,2,53,4,2005-06-15 18:02:53
4,2005,6,15,21,8,46,5,2005-06-15 21:08:46


In [130]:
set_dataframe(df_dates, 'dim_date', 'date_key', 'insert', **mysql_args)

In [132]:
df_staff['staff_key'] = range(1, len(df_staff) + 1)
set_dataframe(df_staff, 'dim_staff', 'staff_key', 'insert', **mysql_args)

In [140]:
df_store['store_key'] = range(1, len(df_store) + 1)
set_dataframe(df_store, 'dim_store', 'store_key', 'insert', **mysql_args)
df_store.head()

Unnamed: 0,store_id,manager_staff_id,address_id,last_update,store_key
0,1,1,1,1139979432000,1
1,2,2,2,1139979432000,2


In [152]:
# Retrieve payment data including full datetime for payment_date
sql_payment = """
    SELECT payment_id, customer_id, staff_id, payment_date, amount 
    FROM sakila.payment;
"""

df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)

# Map customer_id to customer_key in dim_customer
df_payment = df_payment.merge(df_customers[['customer_id', 'customer_key']], on='customer_id', how='left')

# Map payment_date to date_key in dim_date
df_payment = df_payment.merge(df_dates[['date', 'date_key']], left_on='payment_date', right_on='date', how='left')

# Map staff_id to staff_key in dim_staff
df_payment = df_payment.merge(df_staff[['staff_id']], on='staff_id', how='left')

# Map staff_id to store_key via manager_staff_id in dim_store
df_payment = df_payment.merge(df_store[['manager_staff_id', 'store_key']], left_on='staff_id', right_on='manager_staff_id', how='left')

# Ensure payment_id is included in the final fact_payment DataFrame
fact_payment = df_payment[['payment_id', 'customer_key', 'date_key', 'staff_id', 'store_key', 'amount']]

# Load fact data into the fact_payment table, with payment_id as the primary key
set_dataframe(fact_payment, 'fact_payment', 'payment_id', 'insert', **mysql_args)


In [156]:
query_total_by_customer  = """
SELECT c.first_name, c.last_name, SUM(fp.amount) AS total_spent
FROM fact_payment fp
JOIN dim_customer c ON fp.customer_key = c.customer_key
GROUP BY c.first_name, c.last_name
ORDER BY total_spent DESC;
"""
df_query_total_by_customer = get_sql_dataframe(query_total_by_customer, **mysql_args)
df_query_total_by_customer

Unnamed: 0,first_name,last_name,total_spent
0,KARL,SEAL,221.55
1,ELEANOR,HUNT,216.54
2,CLARA,SHAW,195.58
3,RHONDA,KENNEDY,194.61
4,MARION,SNYDER,194.61
...,...,...,...
594,ANNIE,RUSSELL,58.82
595,JOHNNY,TURPIN,57.81
596,BRIAN,WYMAN,52.88
597,LEONA,OBRIEN,50.86


In [158]:
query_monthly_summary = """
SELECT d.year, d.month, SUM(fp.amount) AS monthly_revenue
FROM fact_payment fp
JOIN dim_date d ON fp.date_key = d.date_key
GROUP BY d.year, d.month
ORDER BY d.year, d.month;
"""

df_query_monthly_summary = get_sql_dataframe(query_monthly_summary, **mysql_args)
df_query_monthly_summary

Unnamed: 0,year,month,monthly_revenue
0,2005,5,4823.44
1,2005,6,9629.89
2,2005,7,28368.91
3,2005,8,24070.14
4,2006,2,514.18


In [160]:
query_by_staff  = """
SELECT s.first_name, s.last_name, COUNT(fp.payment_id) AS payments_processed, SUM(fp.amount) AS total_revenue
FROM fact_payment fp
JOIN dim_staff s ON fp.staff_id = s.staff_id
GROUP BY s.first_name, s.last_name
ORDER BY total_revenue DESC;
"""

df_query_by_staff = get_sql_dataframe(query_by_staff, **mysql_args)
df_query_by_staff

Unnamed: 0,first_name,last_name,payments_processed,total_revenue
0,Jon,Stephens,7990,33924.06
1,Mike,Hillyer,8054,33482.5


In [164]:
query_by_store = """
SELECT st.store_id, COUNT(fp.payment_id) AS payments_count, SUM(fp.amount) AS store_revenue
FROM fact_payment fp
JOIN dim_store st ON fp.store_key = st.store_key
GROUP BY st.store_id
ORDER BY store_revenue DESC;
"""

df_query_by_store = get_sql_dataframe(query_by_store, **mysql_args)
df_query_by_store

Unnamed: 0,store_id,payments_count,store_revenue
0,2,7990,33924.06
1,1,8054,33482.5
