In [331]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [333]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.30
Running PyMongo Version: 4.10.1


#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [455]:
mysql_args = {
    "uid" : "root",
    "pwd" : "Luckylemons12$",
    "hostname" : "localhost",
    "dbname" : "sakila2"
}

# The 'cluster_location' must either be "atlas" or "local".
#username: jsq4xr
# password: 5DltCXGlClkT1XtZ
mongodb_args = {
    "user_name" : "jsq4xr",
    "password" : "5DltCXGlClkT1XtZ",
    "cluster_name" : "Cluster0",
    "cluster_subnet" : "laoqh",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila_purchasing"
}

#### Define Functions for Getting Data From and Setting Data Into Databases

In [339]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

POPULATING MONGODB

In [453]:
#dims from MongoDB
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"customer" : 'sakila_customer.json',
             "rental" : 'sakila_rental.json',
             "staff" : 'sakila_staff.json',
             "inventory" : 'sakila_inventory.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)    

POPULATING NEW DIM TABLES

In [401]:
#FILM dim. from MySQL
sql_film = "SELECT * FROM sakila.film;"
df_film = get_sql_dataframe(sql_film, **mysql_args)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [359]:
#PAYMENT dim. from local csv
file_path = '/Users/Kaykizzzle/Documents/DS2002/sakila_payment.csv'
df_payment = pd.read_csv(file_path)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


In [367]:
#CUSTOMER dim. from MongoDB
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "customer"

df_customer = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [463]:
#INVENTORY dim. from MongoDB
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "inventory"

df_inventory = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


In [465]:
#STAFF dim. from MongoDB
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "staff"

df_staff = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,email,store_id,active,username,last_update
0,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,2006-02-15 03:57:16
1,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,2006-02-15 03:57:16


TRANSFORM NEW DIMENSIONS

In [369]:
#PAYMENT
# 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
drop_cols = ['last_update', 'customer_id', 'staff_id', 'rental_id']
df_payment.drop(drop_cols, axis=1, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_payment.insert(0, "payment_key", range(1, df_payment.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate your work
df_payment.head(2)

Unnamed: 0,payment_key,payment_id,amount,payment_date
0,1,1,2.99,2005-05-25 11:30:37
1,2,2,0.99,2005-05-28 10:35:23


In [403]:
#FILM
# 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
drop_cols = ['original_language_id', 'last_update']
df_film.drop(drop_cols, axis=1, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_film.insert(0, "film_key", range(1, df_film.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate your work
df_film.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"


In [373]:
#CUSTOMER
# 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
drop_cols = ['last_update', 'store_id', 'address_id']
df_customer.drop(drop_cols, axis=1, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_customer.insert(0, "customer_key", range(1, df_customer.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate your work
df_customer.head(2)

Unnamed: 0,customer_key,customer_id,first_name,last_name,email,active,create_date
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36
1,2,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,2006-02-14 22:04:36


In [467]:
#INVENTORY
# 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
drop_cols = ['last_update', 'store_id']
df_inventory.drop(drop_cols, axis=1, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_inventory.insert(0, "inventory_key", range(1, df_inventory.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate your work
df_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id
0,1,1,1
1,2,2,1


In [469]:
#STAFF
# 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
drop_cols = ['last_update', 'store_id', 'address_id']
df_staff.drop(drop_cols, axis=1, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_staff.insert(0, "staff_key", range(1, df_staff.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate your work
df_staff.head(2)

Unnamed: 0,staff_key,staff_id,first_name,last_name,email,active,username
0,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike
1,2,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,Jon


WRITE NEW DIMS BACK TO MYSQL

In [457]:
#PAYMENT
dataframe = df_payment
table_name = 'dim_payment'
primary_key = 'payment_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [459]:
#CUSTOMER
dataframe = df_customer
table_name = 'dim_customer'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [461]:
#FILM
dataframe = df_film
table_name = 'dim_film'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [471]:
#INVENTORY
dataframe = df_inventory
table_name = 'dim_inventory'
primary_key = 'inventory_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [473]:
#STAFF
dataframe = df_staff
table_name = 'dim_staff'
primary_key = 'staff_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

VALIDATE NEW DIM TABLES

In [475]:
#PAYMENT
sql_payment = "SELECT * FROM sakila2.dim_payment;"
df_dim_payment = get_sql_dataframe(sql_payment, **mysql_args)
df_dim_payment.head(2)

Unnamed: 0,payment_key,payment_id,amount,payment_date
0,1,1,2.99,2005-05-25 11:30:37
1,2,2,0.99,2005-05-28 10:35:23


In [477]:
#CUSTOMER
sql_customer = "SELECT * FROM sakila2.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_customer, **mysql_args)
df_dim_customer.head(2)

Unnamed: 0,customer_key,customer_id,first_name,last_name,email,active,create_date
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36
1,2,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,2006-02-14 22:04:36


In [479]:
#FILM
sql_film = "SELECT * FROM sakila2.dim_film;"
df_dim_film = get_sql_dataframe(sql_film, **mysql_args)
df_dim_film.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"


In [481]:
#inventory
sql_inventory = "SELECT * FROM sakila2.dim_inventory;"
df_dim_inventory = get_sql_dataframe(sql_inventory, **mysql_args)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id
0,1,1,1
1,2,2,1


In [483]:
#staff
sql_staff = "SELECT * FROM sakila2.dim_staff;"
df_dim_staff = get_sql_dataframe(sql_staff, **mysql_args)
df_dim_staff.head(2)

Unnamed: 0,staff_key,staff_id,first_name,last_name,email,active,username
0,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike
1,2,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,Jon


CREATE AND POPULATE NEW FACT TABLE

In [509]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "rental"

df_fact_re = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_fact_re.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


LOOK UP DATE KEYS

In [511]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "rental_date" Column.
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_re.rental_date = df_fact_re.rental_date.astype('datetime64[ns]').dt.date
df_fact_re = pd.merge(df_fact_re, df_dim_rental_date, on='rental_date', how='left')
df_fact_re.drop(['rental_date'], axis=1, inplace=True)
df_fact_re.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,return_date,staff_id,last_update,rental_date_key
0,1,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,20050524
1,2,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53,20050524


In [513]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "return_date" Column.
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_re.return_date = df_fact_re.return_date.astype('datetime64[ns]').dt.date
df_fact_re = pd.merge(df_fact_re, df_dim_return_date, on='return_date', how='left')
df_fact_re.drop(['return_date'], axis=1, inplace=True)
df_fact_re.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,staff_id,last_update,rental_date_key,return_date_key
0,1,367,130,1,2006-02-15 21:30:53,20050524,20050526
1,2,1525,459,1,2006-02-15 21:30:53,20050524,20050528


In [514]:
#FACT RENTAL removing last_update date column for transformation purposes
drop_cols = ['last_update']
df_fact_re.drop(drop_cols, axis=1, inplace=True)

df_fact_re.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,staff_id,rental_date_key,return_date_key
0,1,367,130,1,20050524,20050526
1,2,1525,459,1,20050524,20050528


In [None]:
PRIMARY KEYS FROM DIM TABLES

In [487]:
#PAYMENT
sql_dim_payment = "SELECT payment_key, payment_id FROM sakila2.dim_payment;"
df_dim_payment = get_sql_dataframe(sql_dim_payment, **mysql_args)
df_dim_payment.head(2)

Unnamed: 0,payment_key,payment_id
0,1,1
1,2,2


In [489]:
#CUSTOMER
sql_dim_customer = "SELECT customer_key, customer_id FROM sakila2.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_dim_customer, **mysql_args)
df_dim_customer.head(2)

Unnamed: 0,customer_key,customer_id
0,1,1
1,2,2


In [491]:
#FILM
sql_dim_film = "SELECT film_key, film_id FROM sakila2.dim_film;"
df_dim_film = get_sql_dataframe(sql_dim_film, **mysql_args)
df_dim_film.head(2)

Unnamed: 0,film_key,film_id
0,1,1
1,2,2


In [485]:
#inventory
sql_dim_inventory = "SELECT inventory_key, inventory_id FROM sakila2.dim_inventory;"
df_dim_inventory = get_sql_dataframe(sql_dim_inventory, **mysql_args)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id
0,1,1
1,2,2


In [493]:
#staff
sql_dim_staff = "SELECT staff_key, staff_id FROM sakila2.dim_staff;"
df_dim_staff = get_sql_dataframe(sql_dim_staff, **mysql_args)
df_dim_staff.head(2)

Unnamed: 0,staff_key,staff_id
0,1,1
1,2,2


In [None]:
BUSINESS KEYS TO LOOK UP SPK IN DIM TABLES

In [517]:
# Merged the "fact_re" and "dim_customer" dataframes on the 'customer_id' to
# get the 'customer_key'. Then drop the 'custormer_id' column and display the results.
df_fact_re = pd.merge(df_fact_re, df_dim_customer[['customer_key', 'customer_id']], on='customer_id', how='left')
df_fact_re.drop(['customer_id'], axis=1, inplace=True)
df_fact_re.head(2)

Unnamed: 0,rental_id,inventory_id,staff_id,rental_date_key,return_date_key,customer_key
0,1,367,1,20050524,20050526,130
1,2,1525,1,20050524,20050528,459


In [519]:
# Merged the "fact_re" and "dim_inventory" dataframes on the 'inventory_id' to
# get the 'inventory_key'. Then drop the 'inventory_id' column and display the results.
df_fact_re = pd.merge(df_fact_re, df_dim_inventory[['inventory_key', 'inventory_id']], on='inventory_id', how='left')
df_fact_re.drop(['inventory_id'], axis=1, inplace=True)
df_fact_re.head(2)

Unnamed: 0,rental_id,staff_id,rental_date_key,return_date_key,customer_key,inventory_key
0,1,1,20050524,20050526,130,367.0
1,2,1,20050524,20050528,459,


In [521]:
# Merged the "fact_re" and "dim_staff" dataframes on the 'staff_id' to
# get the 'staff_key'. Then drop the 'staff_id' column and display the results.
df_fact_re = pd.merge(df_fact_re, df_dim_staff[['staff_key', 'staff_id']], on='staff_id', how='left')
df_fact_re.drop(['staff_id'], axis=1, inplace=True)
df_fact_re.head(2)

Unnamed: 0,rental_id,rental_date_key,return_date_key,customer_key,inventory_key,staff_key
0,1,20050524,20050526,130,367.0,1
1,2,20050524,20050528,459,,1


In [523]:
# Inserted a new 'fact_rental_key' column, with an ever-incrementing
# numeric value, to serve as the surrogate primary key. Then display the results.
df_fact_re['fact_rental_key'] = range(1, len(df_fact_re) + 1)
df_fact_re.head(2)

Unnamed: 0,rental_id,rental_date_key,return_date_key,customer_key,inventory_key,staff_key,fact_rental_key
0,1,20050524,20050526,130,367.0,1,1
1,2,20050524,20050528,459,,1,2


In [None]:
PERFORMING NECESSARY FACT TRANSFORMATIONS

In [527]:
# Reorder the Columns
ordered_columns = ['fact_rental_key','rental_id','rental_date_key'
                   ,'customer_key','inventory_key'
                   ,'staff_key']

df_fact_re = df_fact_re[ordered_columns]
df_fact_re.head(2)

Unnamed: 0,fact_rental_key,rental_id,rental_date_key,customer_key,inventory_key,staff_key
0,1,1,20050524,130,367.0,1
1,2,2,20050524,459,,1


In [None]:
LOAD INTO DATA WAREHOUSE SAKILA2 AND VERIFY

In [529]:
dataframe = df_fact_re
table_name = 'fact_rental'
primary_key = 'fact_rental_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [537]:
sql_fact_re = "SELECT * FROM sakila2.fact_rental;"
df_fact_re = get_sql_dataframe(sql_fact_re, **mysql_args)
df_fact_re.head(2)

Unnamed: 0,fact_rental_key,rental_id,rental_date_key,customer_key,inventory_key,staff_key
0,1,1,20050524,130,367.0,1
1,2,2,20050524,459,,1


In [None]:
AUTHORING QUERIES TO PROVE THE VALIDITY OF MY DATAWAREHOUSE

In [559]:
#counts the number of rentals by each customer
# uses fact table combined with customer dim
sql_rental = """
SELECT 
    c.first_name,
    COUNT(fr.rental_id) AS total_rentals
FROM 
    fact_rental fr
JOIN 
    dim_customer c ON fr.customer_key = c.customer_key
GROUP BY 
    c.first_name
ORDER BY 
    total_rentals DESC;
"""

In [549]:
df_fact_rental = get_sql_dataframe(sql_rental, **mysql_args)
df_fact_rental

Unnamed: 0,first_name,total_rentals
0,SUE,8
1,LESLIE,7
2,RUTH,6
3,BILLY,6
4,COURTNEY,6
...,...,...
474,NANCY,1
475,SHANNON,1
476,DUANE,1
477,NATALIE,1


In [561]:
#calculates the total and average number of rentals processed by each staff member
# uses avg aggregation method with the staff dim and fact table
sql_rental = """
SELECT 
    s.first_name,
    COUNT(fr.rental_id) AS total_rentals,
    AVG(fr.rental_id) AS avg_rentals
FROM 
    fact_rental fr
JOIN 
    dim_staff s ON fr.staff_key = s.staff_key
GROUP BY 
    s.first_name
ORDER BY 
    total_rentals DESC;
"""

In [557]:
df_fact_rental = get_sql_dataframe(sql_rental, **mysql_args)
df_fact_rental

Unnamed: 0,first_name,total_rentals,avg_rentals
0,Jon,521,501.6027
1,Mike,479,500.7203


In [593]:
#calculates total rentals for each combination of customer and staff
#uses both customer and staff dims and fact table and uses the count aggregation method
sql_rental = """ 
SELECT 
    c.first_name,
    s.first_name,
    COUNT(fr.rental_id) AS total_rentals
FROM 
    fact_rental fr
JOIN 
    dim_customer c ON fr.customer_key = c.customer_key
JOIN 
    dim_staff s ON fr.staff_key = s.staff_key
GROUP BY 
    c.first_name, s.first_name
ORDER BY 
    total_rentals DESC;
"""

In [595]:
df_fact_rental = get_sql_dataframe(sql_rental, **mysql_args)
df_fact_rental

Unnamed: 0,first_name,first_name.1,total_rentals
0,ISAAC,Mike,5
1,COURTNEY,Jon,5
2,SUE,Mike,5
3,JANE,Mike,5
4,CASSANDRA,Jon,4
...,...,...,...
667,SHANNON,Mike,1
668,DUANE,Mike,1
669,NATALIE,Jon,1
670,ALAN,Mike,1
