## DS-2002 Data Project 1: Sakila Rental Business ETL Pipeline
This project demonstrates an ETL pipeline that extracts data from multiple sources (MySQL, MongoDB, CSV file), transforms it, and loads it into a dimensional data mart optimized for analyzing the Sakila DVD rental business.

### Business Process: DVD Rental Transactions
The data mart models rental transactions between customers and the rental store, tracking which films are rented, when they're rented, and payment information.

### Data Sources:
1. **MySQL Database**: Sakila sample database (customers, films, rentals, payments)
2. **MongoDB**: Date dimension (dim_date.json)
3. **CSV File**: Store information (sakila_store.csv)

### Prerequisites:
This notebook requires the following libraries:
- `python -m pip install pymongo[srv]`
- `python -m pip install pymysql`
- `python -m pip install sqlalchemy`
- `python -m pip install pandas`

#### Import the Necessary Libraries

In [1]:
import os
import json
import numpy as np
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.44
Running PyMongo Version: 4.15.3


#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Servers & Databases

In [3]:
# MySQL connection for source Sakila database
mysql_sakila_args = {
    "uid" : "root",
    "pwd" : "Arsenal-fan-$aka-7",
    "hostname" : "localhost",
    "dbname" : "sakila"
}

# MySQL connection for destination data warehouse
mysql_dw_args = {
    "uid" : "root",
    "pwd" : "Arsenal-fan-$aka-7",
    "hostname" : "localhost",
    "dbname" : "sakila_dw"
}

# MongoDB connection arguments
mongodb_args = {
    "user_name" : "nathanluu",
    "password" : "dFq6YFa5c6ggnmMf",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "7ycvikt",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila"
}

#### Define Functions for Getting Data From and Setting Data Into Databases

In [4]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection)
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql() function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Populate MongoDB with Date Dimension Data
Upload the dim_date.json file to MongoDB. This operation is idempotent and can be run multiple times.

In [5]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"dim_date" : 'sakila_dim_date.json'}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)
print("Date dimension uploaded to MongoDB successfully!")

Date dimension uploaded to MongoDB successfully!


### 1.0. Create and Populate the Dimension Tables
#### 1.1. Extract Date Dimension from MongoDB

In [6]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "dim_date"

df_dim_date = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_dim_date.head(2)

Unnamed: 0,DateKey,DateValue,DayOfMonth,Month,MonthName,Quarter,Year,WeekOfYear,DayOfWeek,DayName,IsWeekend
0,20050101,2005-01-01,1,1,January,1,2005,0,7,Saturday,1
1,20050102,2005-01-02,2,1,January,1,2005,0,1,Sunday,1


#### 1.2. Transform and Load Date Dimension to Data Warehouse

In [7]:
# Ensure DateValue is in proper datetime format
df_dim_date['DateValue'] = pd.to_datetime(df_dim_date['DateValue'])

# Load to data warehouse
dataframe = df_dim_date
table_name = 'dim_date'
primary_key = 'DateKey'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_dw_args)
print("Date dimension loaded to data warehouse successfully!")

Date dimension loaded to data warehouse successfully!


#### 1.3. Extract Customer Dimension from MySQL Sakila Database

In [8]:
sql_customers = """
    SELECT 
        c.customer_id,
        c.first_name,
        c.last_name,
        c.email,
        c.active,
        ci.city,
        co.country
    FROM sakila.customer c
    INNER JOIN sakila.address a ON c.address_id = a.address_id
    INNER JOIN sakila.city ci ON a.city_id = ci.city_id
    INNER JOIN sakila.country co ON ci.country_id = co.country_id;
"""

df_customers = get_sql_dataframe(sql_customers, **mysql_sakila_args)
df_customers.head(2)

Unnamed: 0,customer_id,first_name,last_name,email,active,city,country
0,218,VERA,MCCOY,VERA.MCCOY@sakilacustomer.org,1,Kabul,Afghanistan
1,441,MARIO,CHEATHAM,MARIO.CHEATHAM@sakilacustomer.org,1,Batna,Algeria


#### 1.4. Transform Customer Dimension

In [9]:
# Insert a surrogate key
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))

# Convert active to boolean string
df_customers['active'] = df_customers['active'].map({1: 'True', 0: 'False'})

df_customers.head(2)

Unnamed: 0,customer_key,customer_id,first_name,last_name,email,active,city,country
0,1,218,VERA,MCCOY,VERA.MCCOY@sakilacustomer.org,True,Kabul,Afghanistan
1,2,441,MARIO,CHEATHAM,MARIO.CHEATHAM@sakilacustomer.org,True,Batna,Algeria


#### 1.5. Load Customer Dimension to Data Warehouse

In [10]:
dataframe = df_customers
table_name = 'dim_customers'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_dw_args)
print("Customer dimension loaded successfully!")

Customer dimension loaded successfully!


#### 1.6. Extract Film Dimension from MySQL Sakila Database

In [11]:
sql_films = """
    SELECT 
        f.film_id,
        f.title,
        f.description,
        f.release_year,
        f.rental_duration,
        f.rental_rate,
        f.length,
        f.replacement_cost,
        f.rating,
        c.name AS category
    FROM sakila.film f
    LEFT JOIN sakila.film_category fc ON f.film_id = fc.film_id
    LEFT JOIN sakila.category c ON fc.category_id = c.category_id;
"""

df_films = get_sql_dataframe(sql_films, **mysql_sakila_args)
df_films.head(2)

Unnamed: 0,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,category
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,6,0.99,86,20.99,PG,Documentary
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,3,4.99,48,12.99,G,Horror


#### 1.7. Transform Film Dimension

In [12]:
# Insert a surrogate key
df_films.insert(0, "film_key", range(1, df_films.shape[0]+1))

df_films.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,category
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,6,0.99,86,20.99,PG,Documentary
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,3,4.99,48,12.99,G,Horror


#### 1.8. Load Film Dimension to Data Warehouse

In [13]:
dataframe = df_films
table_name = 'dim_films'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_dw_args)
print("Film dimension loaded successfully!")

Film dimension loaded successfully!


#### 1.9. Extract Staff Dimension from MySQL Sakila Database

In [14]:
sql_staff = """
    SELECT 
        s.staff_id,
        s.first_name,
        s.last_name,
        s.email,
        s.active,
        st.store_id,
        a.address,
        c.city,
        co.country
    FROM sakila.staff s
    INNER JOIN sakila.store st ON s.store_id = st.store_id
    INNER JOIN sakila.address a ON s.address_id = a.address_id
    INNER JOIN sakila.city c ON a.city_id = c.city_id
    INNER JOIN sakila.country co ON c.country_id = co.country_id;
"""

df_staff = get_sql_dataframe(sql_staff, **mysql_sakila_args)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,email,active,store_id,address,city,country
0,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,1,23 Workhaven Lane,Lethbridge,Canada
1,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,2,1411 Lillydale Drive,Woodridge,Australia


#### 1.10. Transform Staff Dimension

In [15]:
# Insert a surrogate key
df_staff.insert(0, "staff_key", range(1, df_staff.shape[0]+1))

# Convert active to boolean string
df_staff['active'] = df_staff['active'].map({1: 'True', 0: 'False'})

df_staff.head(2)

Unnamed: 0,staff_key,staff_id,first_name,last_name,email,active,store_id,address,city,country
0,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,True,1,23 Workhaven Lane,Lethbridge,Canada
1,2,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,True,2,1411 Lillydale Drive,Woodridge,Australia


#### 1.11. Load Staff Dimension to Data Warehouse

In [16]:
dataframe = df_staff
table_name = 'dim_staff'
primary_key = 'staff_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_dw_args)
print("Staff dimension loaded successfully!")

Staff dimension loaded successfully!


#### 1.12. Extract Store Dimension from CSV File

In [17]:
# Read the CSV file
csv_file = os.path.join(data_dir, 'sakila_store.csv')
df_stores = pd.read_csv(csv_file)
df_stores.head(2)

Unnamed: 0,store_id,manager_staff_id,address_id,last_update
0,1,1,1,2006-02-15 04:57:12
1,2,2,2,2006-02-15 04:57:12


#### 1.10. Transform Store Dimension

In [18]:
# If store_id doesn't exist, create it; otherwise just add surrogate key
if 'store_id' not in df_stores.columns:
    df_stores.insert(0, 'store_id', range(1, df_stores.shape[0]+1))

# Insert a surrogate key
df_stores.insert(0, "store_key", range(1, df_stores.shape[0]+1))

df_stores.head(2)

Unnamed: 0,store_key,store_id,manager_staff_id,address_id,last_update
0,1,1,1,1,2006-02-15 04:57:12
1,2,2,2,2,2006-02-15 04:57:12


#### 1.11. Load Store Dimension to Data Warehouse

In [19]:
dataframe = df_stores
table_name = 'dim_stores'
primary_key = 'store_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_dw_args)
print("Store dimension loaded successfully!")

Store dimension loaded successfully!


#### 1.12. Validate Existence of All Dimension Tables

In [20]:
# Validate dim_date
sql_validate = "SELECT * FROM sakila_dw.dim_date LIMIT 5;"
df_validate = get_sql_dataframe(sql_validate, **mysql_dw_args)
print("\nDate Dimension:")
display(df_validate)


Date Dimension:


Unnamed: 0,DateKey,DateValue,DayOfMonth,Month,MonthName,Quarter,Year,WeekOfYear,DayOfWeek,DayName,IsWeekend
0,20050101,2005-01-01,1,1,January,1,2005,0,7,Saturday,1
1,20050102,2005-01-02,2,1,January,1,2005,0,1,Sunday,1
2,20050103,2005-01-03,3,1,January,1,2005,1,2,Monday,0
3,20050104,2005-01-04,4,1,January,1,2005,1,3,Tuesday,0
4,20050105,2005-01-05,5,1,January,1,2005,1,4,Wednesday,0


In [21]:
# Validate dim_customers
sql_validate = "SELECT * FROM sakila_dw.dim_customers LIMIT 5;"
df_validate = get_sql_dataframe(sql_validate, **mysql_dw_args)
print("\nCustomer Dimension:")
display(df_validate)


Customer Dimension:


Unnamed: 0,customer_key,customer_id,first_name,last_name,email,active,city,country
0,1,218,VERA,MCCOY,VERA.MCCOY@sakilacustomer.org,True,Kabul,Afghanistan
1,2,441,MARIO,CHEATHAM,MARIO.CHEATHAM@sakilacustomer.org,True,Batna,Algeria
2,3,69,JUDY,GRAY,JUDY.GRAY@sakilacustomer.org,True,Béchar,Algeria
3,4,176,JUNE,CARROLL,JUNE.CARROLL@sakilacustomer.org,True,Skikda,Algeria
4,5,320,ANTHONY,SCHWAB,ANTHONY.SCHWAB@sakilacustomer.org,True,Tafuna,American Samoa


In [22]:
# Validate dim_films
sql_validate = "SELECT * FROM sakila_dw.dim_films LIMIT 5;"
df_validate = get_sql_dataframe(sql_validate, **mysql_dw_args)
print("\nFilm Dimension:")
display(df_validate)


Film Dimension:


Unnamed: 0,film_key,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,category
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,6,0.99,86,20.99,PG,Documentary
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,3,4.99,48,12.99,G,Horror
2,3,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a ...,2006,7,2.99,50,18.99,NC-17,Documentary
3,4,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumb...,2006,5,2.99,117,26.99,G,Horror
4,5,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And ...,2006,6,2.99,130,22.99,G,Family


In [23]:
# Validate dim_stores
sql_validate = "SELECT * FROM sakila_dw.dim_stores LIMIT 5;"
df_validate = get_sql_dataframe(sql_validate, **mysql_dw_args)
print("\nStore Dimension:")
display(df_validate)


Store Dimension:


Unnamed: 0,store_key,store_id,manager_staff_id,address_id,last_update
0,1,1,1,1,2006-02-15 04:57:12
1,2,2,2,2,2006-02-15 04:57:12


### 2.0. Create and Populate the Fact Table
#### 2.1. Extract Rental and Payment Data from MySQL Sakila Database

In [24]:
sql_rentals = """
    SELECT 
        r.rental_id,
        r.rental_date,
        r.return_date,
        r.customer_id,
        i.film_id,
        i.store_id,
        p.payment_id,
        p.amount,
        p.payment_date
    FROM sakila.rental r
    INNER JOIN sakila.inventory i ON r.inventory_id = i.inventory_id
    LEFT JOIN sakila.payment p ON r.rental_id = p.rental_id;
"""

df_fact_rentals = get_sql_dataframe(sql_rentals, **mysql_sakila_args)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,rental_date,return_date,customer_id,film_id,store_id,payment_id,amount,payment_date
0,4863,2005-07-08 19:03:15,2005-07-11 21:29:15,431,1,1,11630,0.99,2005-07-08 19:03:15
1,11433,2005-08-02 20:13:10,2005-08-11 21:35:10,518,1,1,13956,3.99,2005-08-02 20:13:10


#### 2.2. Get Date Dimension from Data Warehouse for Date Key Lookups

In [25]:
sql_dim_date = "SELECT DateKey, DateValue FROM sakila_dw.dim_date;"
df_dim_date_lookup = get_sql_dataframe(sql_dim_date, **mysql_dw_args)
df_dim_date_lookup['DateValue'] = pd.to_datetime(df_dim_date_lookup['DateValue']).dt.date
df_dim_date_lookup.head(2)

Unnamed: 0,DateKey,DateValue
0,20050101,2005-01-01
1,20050102,2005-01-02


#### 2.3. Lookup Rental Date Keys

In [26]:
# Lookup rental_date_key
df_dim_rental_date = df_dim_date_lookup.rename(columns={"DateKey": "rental_date_key", "DateValue": "rental_date"})
df_fact_rentals['rental_date'] = pd.to_datetime(df_fact_rentals['rental_date']).dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_rental_date, on='rental_date', how='left')
df_fact_rentals.drop(['rental_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,return_date,customer_id,film_id,store_id,payment_id,amount,payment_date,rental_date_key
0,4863,2005-07-11 21:29:15,431,1,1,11630,0.99,2005-07-08 19:03:15,20050708
1,11433,2005-08-11 21:35:10,518,1,1,13956,3.99,2005-08-02 20:13:10,20050802


#### 2.4. Lookup Return Date Keys

In [27]:
# Lookup return_date_key
df_dim_return_date = df_dim_date_lookup.rename(columns={"DateKey": "return_date_key", "DateValue": "return_date"})
df_fact_rentals['return_date'] = pd.to_datetime(df_fact_rentals['return_date']).dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_return_date, on='return_date', how='left')
df_fact_rentals.drop(['return_date'], axis=1, inplace=True)
df_fact_rentals['return_date_key'] = df_fact_rentals['return_date_key'].astype('Int64')
df_fact_rentals.head(2)

Unnamed: 0,rental_id,customer_id,film_id,store_id,payment_id,amount,payment_date,rental_date_key,return_date_key
0,4863,431,1,1,11630,0.99,2005-07-08 19:03:15,20050708,20050711
1,11433,518,1,1,13956,3.99,2005-08-02 20:13:10,20050802,20050811


#### 2.5. Lookup Payment Date Keys

In [28]:
# Lookup payment_date_key
df_dim_payment_date = df_dim_date_lookup.rename(columns={"DateKey": "payment_date_key", "DateValue": "payment_date"})
df_fact_rentals['payment_date'] = pd.to_datetime(df_fact_rentals['payment_date']).dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_payment_date, on='payment_date', how='left')
df_fact_rentals.drop(['payment_date'], axis=1, inplace=True)
df_fact_rentals['payment_date_key'] = df_fact_rentals['payment_date_key'].astype('Int64')
df_fact_rentals.head(2)

Unnamed: 0,rental_id,customer_id,film_id,store_id,payment_id,amount,rental_date_key,return_date_key,payment_date_key
0,4863,431,1,1,11630,0.99,20050708,20050711,20050708
1,11433,518,1,1,13956,3.99,20050802,20050811,20050802


#### 2.6. Get Dimension Tables for Foreign Key Lookups

In [29]:
# Get customer keys
sql_dim_customers = "SELECT customer_key, customer_id FROM sakila_dw.dim_customers;"
df_dim_customers = get_sql_dataframe(sql_dim_customers, **mysql_dw_args)

# Get film keys
sql_dim_films = "SELECT film_key, film_id FROM sakila_dw.dim_films;"
df_dim_films = get_sql_dataframe(sql_dim_films, **mysql_dw_args)

# Get store keys
sql_dim_stores = "SELECT store_key, store_id FROM sakila_dw.dim_stores;"
df_dim_stores = get_sql_dataframe(sql_dim_stores, **mysql_dw_args)

print("Dimension keys retrieved successfully!")

Dimension keys retrieved successfully!


#### 2.7. Lookup Customer Keys

In [30]:
# Merge with customer dimension
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_customers, on='customer_id', how='left')
df_fact_rentals.drop(['customer_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,film_id,store_id,payment_id,amount,rental_date_key,return_date_key,payment_date_key,customer_key
0,4863,1,1,11630,0.99,20050708,20050711,20050708,35
1,11433,1,1,13956,3.99,20050802,20050811,20050802,518


#### 2.8. Lookup Film Keys

In [31]:
# Merge with film dimension
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_films, on='film_id', how='left')
df_fact_rentals.drop(['film_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,store_id,payment_id,amount,rental_date_key,return_date_key,payment_date_key,customer_key,film_key
0,4863,1,11630,0.99,20050708,20050711,20050708,35,1
1,11433,1,13956,3.99,20050802,20050811,20050802,518,1


#### 2.9. Lookup Store Keys

In [32]:
# Merge with store dimension
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_stores, on='store_id', how='left')
df_fact_rentals.drop(['store_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,payment_id,amount,rental_date_key,return_date_key,payment_date_key,customer_key,film_key,store_key
0,4863,11630,0.99,20050708,20050711,20050708,35,1,1
1,11433,13956,3.99,20050802,20050811,20050802,518,1,1


#### 2.10. Transform Fact Table - Add Surrogate Key and Reorder Columns

In [33]:
# Insert surrogate primary key
df_fact_rentals.insert(0, "rental_key", range(1, df_fact_rentals.shape[0]+1))

# Reorder columns for clarity
ordered_columns = ['rental_key', 'rental_id', 'payment_id', 
                   'customer_key', 'film_key', 'store_key',
                   'rental_date_key', 'return_date_key', 'payment_date_key',
                   'amount']

df_fact_rentals = df_fact_rentals[ordered_columns]
df_fact_rentals.head(2)

Unnamed: 0,rental_key,rental_id,payment_id,customer_key,film_key,store_key,rental_date_key,return_date_key,payment_date_key,amount
0,1,4863,11630,35,1,1,20050708,20050711,20050708,0.99
1,2,11433,13956,518,1,1,20050802,20050811,20050802,3.99


#### 2.11. Load Fact Table to Data Warehouse

In [34]:
dataframe = df_fact_rentals
table_name = 'fact_rentals'
primary_key = 'rental_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_dw_args)
print("Fact rentals table loaded successfully!")

Fact rentals table loaded successfully!


#### 2.12. Validate Fact Table

In [35]:
sql_fact_rentals = "SELECT * FROM sakila_dw.fact_rentals LIMIT 10;"
df_fact_validated = get_sql_dataframe(sql_fact_rentals, **mysql_dw_args)
print("\nFact Rentals Table:")
display(df_fact_validated)


Fact Rentals Table:


Unnamed: 0,rental_key,rental_id,payment_id,customer_key,film_key,store_key,rental_date_key,return_date_key,payment_date_key,amount
0,1,4863,11630,35,1,1,20050708,20050711,20050708,0.99
1,2,11433,13956,518,1,1,20050802,20050811,20050802,3.99
2,3,14714,7578,260,1,1,20050821,20050830,20050821,3.99
3,4,972,11124,239,1,1,20050530,20050606,20050530,1.99
4,5,2117,4607,232,1,1,20050617,20050623,20050617,0.99
5,6,4187,4381,265,1,1,20050707,20050711,20050707,0.99
6,7,9449,15579,333,1,1,20050730,20050806,20050730,1.99
7,8,15453,9707,9,1,1,20050823,20050830,20050823,1.99
8,9,10126,1088,62,1,1,20050731,20050803,20050731,0.99
9,10,15421,14575,270,1,1,20050822,20050825,20050822,0.99


### 3.0. Demonstrate Data Warehouse Functionality
#### 3.1. Query 1: Total Revenue and Rental Count by Customer
This query demonstrates aggregation (count and sum) across the fact table and one dimension (customers).

In [36]:
sql_customer_analysis = """
    SELECT 
        c.first_name,
        c.last_name,
        c.city,
        c.country,
        COUNT(f.rental_id) AS total_rentals,
        SUM(f.amount) AS total_revenue
    FROM sakila_dw.fact_rentals f
    INNER JOIN sakila_dw.dim_customers c ON f.customer_key = c.customer_key
    GROUP BY c.customer_key, c.first_name, c.last_name, c.city, c.country
    ORDER BY total_revenue DESC
    LIMIT 15;
"""

df_customer_analysis = get_sql_dataframe(sql_customer_analysis, **mysql_dw_args)
print("\nTop 15 Customers by Revenue:")
display(df_customer_analysis)


Top 15 Customers by Revenue:


Unnamed: 0,first_name,last_name,city,country,total_rentals,total_revenue
0,KARL,SEAL,Cape Coral,United States,45,221.55
1,ELEANOR,HUNT,Saint-Denis,Réunion,46,216.54
2,CLARA,SHAW,Molodetšno,Belarus,42,195.58
3,RHONDA,KENNEDY,Apeldoorn,Netherlands,39,194.61
4,MARION,SNYDER,Santa Bárbara d´Oeste,Brazil,39,194.61
5,TOMMY,COLLAZO,Qomsheh,Iran,38,186.62
6,WESLEY,BULL,Ourense (Orense),Spain,40,177.6
7,TIM,CARY,Bijapur,India,39,175.61
8,MARCIA,DEAN,Tanza,Philippines,42,175.58
9,ANA,BRADLEY,Memphis,United States,34,174.66
