### Kyle Tran - DS 2002 Final Project (Computing ID: yzn3hv)
The project focuses on creating a data warehouse lakehouse for a sample SQL dataset, sakila. It builds upon the first project, using Azure Databricks to populate a dimensional Data Lakehouse from 3 different sources: an Azure MYSQL server, MongoDB Atlas, and DBFS.

The three dimensions I chose were: Customer, Rental, and Staff (**NOTE:** ignore film and inventory dimensions, those were additional ones I made during the midterm, which I now realize don't fit my fact_payments table)

- Step 1: Setup
- Step 2: Fetch from Azure MySQL server (customer dimension)
- Step 3: Fetch from MongoDB Atlas (rental dimension)
    - Note: I fetched film, inventory dimensions as well for extra querying in the gold aggregration table but aren't apart of the fact_payments table
- Step 4: Fetch from File System (csv file) (staff dimension)
- Step 5: Integrating Reference Data with Real-Time Data (fact_payments table)
    - Using AutoLoader to Process Streaming (Hot Path) Payments Fact Data [Creating Bronze, Silver, and Gold Tables]

#### 1.0 Setup + Importing Necessary Libraries

In [0]:
# importing libraries
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

##### 1.1 Defining parameters for connecting to Azure MYSQL/MongoDB

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "yzn3hv-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "ktran",
  "password" : "Password!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "mongodbintro.gltgp"
atlas_database_name = "sakila_dw"
atlas_user_name = "kylesethtran"
atlas_password = "Password"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/project_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/sakila"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

payments_output_bronze = f"{database_dir}/fact_payments/bronze"
payments_output_silver = f"{database_dir}/fact_payments/silver"
payments_output_gold   = f"{database_dir}/fact_payments/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_payments", True) 


# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

##### 1.2 Defining Methods for Getting Data from MongoDB (Sourced from Lab 6)

In [0]:
###################################################################################################################
# Here, I am referencing the methods provided in Lab 6 to allow us to fetch a dataFrame from the MongoDB Atlas database server Using PyMongo.
###################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

##### 1.3 Creating Empty Sakila Data Lakehouse

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Project sakila database"
LOCATION "dbfs:/FileStore/project_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

##### 1.4 Extracting Date Dimension from MySQL database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://yzn3hv-mysql.mysql.database.azure.com:3306/sakila_dw", 
  dbtable "dim_date",
  user "ktran",    
  password "Password!"
)

In [0]:
%sql 
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/project_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


#### 2.0 Creating New Customer Table from an Azure MySQL database. (Source 1)

###### a. Create Customer Dimension Table (From MySQL)

In [0]:
%sql
-- Create a Temporary View named "view_customer" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://yzn3hv-mysql.mysql.database.azure.com:3306/sakila_dw", 
  dbtable "dim_customers",
  user "ktran",    
  password "Password!"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_customers
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/project_data/sakila_dlh/dim_customers"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customers;

col_name,data_type,comment
customer_key,bigint,
customer_id,bigint,
store_id,bigint,
first_name,varchar(65535),
last_name,varchar(65535),
email,varchar(65535),
address_id,bigint,
active,bigint,
customer_create_date_key,bigint,
customer_last_update_key,bigint,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customers LIMIT 5

customer_key,customer_id,store_id,first_name,last_name,email,address_id,active,customer_create_date_key,customer_last_update_key
1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,20060214,20060215
2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,20060214,20060215
3,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,20060214,20060215
4,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,20060214,20060215
5,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,20060214,20060215


#### 3.0 Fetching Data from MongoDB for Rental Dimension (Source 2)

###### a. Creating a new MongoDB database with Rental Dimension (pretending the data originates there) (Note: I do work for the film and inventory dimensions as well, but realized they aren't apart of the fact table so disregard)

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/project_data/sakila/batch'

path,name,size,modificationTime
dbfs:/FileStore/project_data/sakila/batch/dim_films.json,dim_films.json,477424,1733260412000
dbfs:/FileStore/project_data/sakila/batch/dim_inventory.json,dim_inventory.json,133294,1733260412000
dbfs:/FileStore/project_data/sakila/batch/dim_rentals.json,dim_rentals.json,226368,1733373774000
dbfs:/FileStore/project_data/sakila/batch/dim_staff.csv,dim_staff.csv,235,1733260412000


In [0]:
source_dir = '/dbfs/FileStore/project_data/sakila/batch'
json_files = {"films" : 'dim_films.json', 
              "inventory" : 'dim_inventory.json',
              "rentals" : 'dim_rentals.json',}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7fd89efe5d40>

###### b. Fetching Rental Dimension and Creating New Dimension Table in sakila_dlh

In [0]:
%scala
import com.mongodb.spark._

val userName = "kylesethtran"
val pwd = "Password"
val clusterName = "mongodbintro.gltgp"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_rental = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw")
.option("collection", "rentals").load()
.select("rental_key", "rental_id", "inventory_id", "customer_id", "return_date", "staff_id", "rental_date_key", "rental_last_update_key")

display(df_rental)

rental_key,rental_id,inventory_id,customer_id,return_date,staff_id,rental_date_key,rental_last_update_key
1,1,367,130,2005-05-26 22:04:30,1,20050524,20060215
2,2,1525,459,2005-05-28 19:40:33,1,20050524,20060215
3,3,1711,408,2005-06-01 22:12:39,1,20050524,20060215
4,4,2452,333,2005-06-03 01:43:41,2,20050524,20060215
5,5,2079,222,2005-06-02 04:33:21,1,20050524,20060215
6,6,2792,549,2005-05-27 01:32:07,1,20050524,20060215
7,7,3995,269,2005-05-29 20:34:53,2,20050524,20060215
8,8,2346,239,2005-05-27 23:33:46,2,20050524,20060215
9,9,2580,126,2005-05-28 00:22:40,1,20050525,20060215
10,10,1824,399,2005-05-31 22:44:21,2,20050525,20060215


In [0]:
%scala
df_rental.printSchema()

In [0]:
%scala
/** Here, we are creating a new Dimension Table in Databricks Metadata database (sakila_dlh) **/
df_rental.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_rentals") 

In [0]:
%sql
/** Now, we are verifying that it was created **/
DESCRIBE EXTENDED sakila_dlh.dim_rentals

col_name,data_type,comment
rental_key,int,
rental_id,int,
inventory_id,int,
customer_id,int,
return_date,string,
staff_id,int,
rental_date_key,int,
rental_last_update_key,int,
,,
# Delta Statistics Columns,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_rentals LIMIT 5

rental_key,rental_id,inventory_id,customer_id,return_date,staff_id,rental_date_key,rental_last_update_key
1,1,367,130,2005-05-26 22:04:30,1,20050524,20060215
2,2,1525,459,2005-05-28 19:40:33,1,20050524,20060215
3,3,1711,408,2005-06-01 22:12:39,1,20050524,20060215
4,4,2452,333,2005-06-03 01:43:41,2,20050524,20060215
5,5,2079,222,2005-06-02 04:33:21,1,20050524,20060215


###### NOTE: The steps below are for fetching the film and inventory dimensions. However, they aren't apart of the fact table used later, so disregard. The rental dimensions fulfills the MongoDB requirement

###### c. Fetching Films and Inventory Dimensions from Newly Created MongoDB database (extra work)

> ###### i. Fetching from Films Dimension & Creating New Dimension Table in Databricks Metadata database (sakila_dlh)

In [0]:
%scala

val df_film = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw")
.option("collection", "films").load()
.select("film_key", "title", "description", "release_year", "language_id",
        "original_language_id","rental_duration", "rental_rate", "length", "replacement_cost","rating","special_features", "film_last_update_key")

display(df_film)

film_key,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,film_last_update_key
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",20060215
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",20060215
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,0,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",20060215
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,0,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",20060215
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,0,6,2.99,130,22.99,G,Deleted Scenes,20060215
6,AGENT TRUMAN,A Intrepid Panorama of a Robot And a Boy who must Escape a Sumo Wrestler in Ancient China,2006,1,0,3,2.99,169,17.99,PG,Deleted Scenes,20060215
7,AIRPLANE SIERRA,A Touching Saga of a Hunter And a Butler who must Discover a Butler in A Jet Boat,2006,1,0,6,4.99,62,28.99,PG-13,"Trailers,Deleted Scenes",20060215
8,AIRPORT POLLOCK,A Epic Tale of a Moose And a Girl who must Confront a Monkey in Ancient India,2006,1,0,6,4.99,54,15.99,R,Trailers,20060215
9,ALABAMA DEVIL,A Thoughtful Panorama of a Database Administrator And a Mad Scientist who must Outgun a Mad Scientist in A Jet Boat,2006,1,0,3,2.99,114,21.99,PG-13,"Trailers,Deleted Scenes",20060215
10,ALADDIN CALENDAR,A Action-Packed Tale of a Man And a Lumberjack who must Reach a Feminist in Ancient China,2006,1,0,6,4.99,63,24.99,NC-17,"Trailers,Deleted Scenes",20060215


In [0]:
%scala
df_film.printSchema()

In [0]:
%scala
/** Here, we are creating a new Dimension Table in Databricks Metadata database (sakila_dlh) **/
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_films") 

In [0]:
%sql
/** Now, we are verifying that it was created **/
DESCRIBE EXTENDED sakila_dlh.dim_films

col_name,data_type,comment
film_key,int,
title,string,
description,string,
release_year,string,
language_id,int,
original_language_id,int,
rental_duration,int,
rental_rate,double,
length,int,
replacement_cost,double,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_films LIMIT 5

film_key,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,film_last_update_key
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",20060215
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",20060215
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,0,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",20060215
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,0,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",20060215
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,0,6,2.99,130,22.99,G,Deleted Scenes,20060215


> ###### ii. Fetching from Inventory Dimension

In [0]:
%scala

val df_inventory = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw")
.option("collection", "inventory").load()
.select("inventory_key", "film_id", "store_id", "inventory_last_update_key")

display(df_inventory)

inventory_key,film_id,store_id,inventory_last_update_key
1,1,1,20060215
2,1,1,20060215
3,1,1,20060215
4,1,1,20060215
5,1,2,20060215
6,1,2,20060215
7,1,2,20060215
8,1,2,20060215
9,2,2,20060215
10,2,2,20060215


In [0]:
%scala
df_inventory.printSchema()

In [0]:
%scala
/** Here, we are creating a new Dimension Table in Databricks Metadata database (sakila_dlh) **/
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
/** Now, we are verifying that the table was created **/
DESCRIBE EXTENDED sakila_dlh.dim_inventory

col_name,data_type,comment
inventory_key,int,
film_id,int,
store_id,int,
inventory_last_update_key,int,
,,
# Delta Statistics Columns,,
Column Names,"inventory_key, film_id, store_id, inventory_last_update_key",
Column Selection Method,first-32,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

inventory_key,film_id,store_id,inventory_last_update_key
1,1,1,20060215
2,1,1,20060215
3,1,1,20060215
4,1,1,20060215
5,1,2,20060215


#### 4.0 Fetching from a Filesystem (DBFS w/csv file, source 3)

In [0]:
# using pyspark to read from CSV
staff_csv = f"{batch_dir}/dim_staff.csv"

df_staff = spark.read.format('csv').options(header='true', inferSchema='true').load(staff_csv)
display(df_staff)

staff_key,staff_id,first_name,last_name,address_id,email,store_id,active,username,staff_last_update_key
1,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,20060215
2,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,20060215


In [0]:
df_staff.printSchema()

root
 |-- staff_key: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- email: string (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- username: string (nullable = true)
 |-- staff_last_update_key: integer (nullable = true)



In [0]:
# Here, we are creating the new staff dimension table for sakila_dlh
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql 
DESCRIBE EXTENDED sakila_dlh.dim_staff

col_name,data_type,comment
staff_key,int,
staff_id,int,
first_name,string,
last_name,string,
address_id,int,
email,string,
store_id,int,
active,int,
username,string,
staff_last_update_key,int,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5

staff_key,staff_id,first_name,last_name,address_id,email,store_id,active,username,staff_last_update_key
1,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,20060215
2,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,20060215


#### 5.0 Using AutoLoader to Process Streaming (Hot Path) Payments Fact Data

In [0]:
%sql
-- Before streaming data, here we are verifying the dimensions --
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customers,False
sakila_dlh,dim_date,False
sakila_dlh,dim_films,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_rentals,False
sakila_dlh,dim_staff,False
,_sqldf,True
,display_query_4,True
,display_query_5,True
,display_query_6,True


###### a. Creating Bronze Table for Raw Data (reading Fact table)

In [0]:
# Here, we are streaming in the raw data from the payments table and writing it to the bronze table
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", payments_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("payments_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW payments_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM payments_raw_tempview
)

In [0]:
%sql
SELECT * FROM payments_bronze_tempview

amount,customer_key,fact_payment_key,payment_date_key,payment_id,payment_last_update_key,rental_key,staff_key,_rescued_data,receipt_time,source_file
4.99,25,667,20050712,668,20060215,6650,1,,2024-12-05T16:49:18.739Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments3.json
2.99,25,668,20050712,669,20060215,6902,2,,2024-12-05T16:49:18.739Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments3.json
2.99,25,669,20050729,670,20060215,8664,2,,2024-12-05T16:49:18.739Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments3.json
0.99,25,670,20050730,671,20060215,8875,2,,2024-12-05T16:49:18.739Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments3.json
8.99,25,671,20050730,672,20060215,9137,1,,2024-12-05T16:49:18.739Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments3.json
2.99,25,672,20050730,673,20060215,9331,2,,2024-12-05T16:49:18.739Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments3.json
2.99,25,673,20050731,674,20060215,9918,2,,2024-12-05T16:49:18.739Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments3.json
2.99,25,674,20050731,675,20060215,10099,2,,2024-12-05T16:49:18.739Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments3.json
5.99,25,675,20050801,676,20060215,10320,1,,2024-12-05T16:49:18.739Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments3.json
2.99,25,676,20050802,677,20060215,10856,2,,2024-12-05T16:49:18.739Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments3.json


In [0]:
# Creating Bronze Table w/append command
(spark.table("payments_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{payments_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_payments_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fd8bb4c11d0>

###### b. Creating Silver Table where reference data is included

In [0]:
(spark.readStream
  .table("fact_payments_bronze")
  .createOrReplaceTempView("payments_silver_tempview"))

In [0]:
%sql
SELECT * FROM payments_silver_tempview

amount,customer_key,fact_payment_key,payment_date_key,payment_id,payment_last_update_key,rental_key,staff_key,_rescued_data,receipt_time,source_file
2.99,1,1,20050525,1,20060215,76,1,,2024-12-05T16:49:40.637Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments1.json
0.99,1,2,20050528,2,20060215,572,1,,2024-12-05T16:49:40.637Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments1.json
5.99,1,3,20050615,3,20060215,1184,1,,2024-12-05T16:49:40.637Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments1.json
0.99,1,4,20050615,4,20060215,1421,2,,2024-12-05T16:49:40.637Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments1.json
9.99,1,5,20050615,5,20060215,1475,2,,2024-12-05T16:49:40.637Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments1.json
4.99,1,6,20050616,6,20060215,1724,1,,2024-12-05T16:49:40.637Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments1.json
4.99,1,7,20050618,7,20060215,2306,1,,2024-12-05T16:49:40.637Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments1.json
0.99,1,8,20050618,8,20060215,2361,2,,2024-12-05T16:49:40.637Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments1.json
3.99,1,9,20050621,9,20060215,3282,1,,2024-12-05T16:49:40.637Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments1.json
5.99,1,10,20050708,10,20060215,4524,2,,2024-12-05T16:49:40.637Z,dbfs:/FileStore/project_data/sakila/stream/fact_payments1.json


In [0]:
%sql
DESCRIBE EXTENDED payments_silver_tempview

col_name,data_type,comment
amount,double,
customer_key,bigint,
fact_payment_key,bigint,
payment_date_key,bigint,
payment_id,bigint,
payment_last_update_key,bigint,
rental_key,bigint,
staff_key,bigint,
_rescued_data,string,
receipt_time,timestamp,


In [0]:
%sql
USE sakila_dlh;
-- Query for joining dimension data (reference data) with fact table
CREATE OR REPLACE TEMPORARY VIEW fact_payments_silver_tempview AS (
  SELECT p.fact_payment_key As Fact_Payment_Key,
      p.payment_id As Payment_ID,
      p.customer_key AS Customer_Key,
      c.first_name AS Customer_First_Name,
      c.last_name AS Customer_Last_Name,
      c.email AS Customer_Email,
      c.active AS Customer_Active_Status,
      p.rental_key As Rental_Key,
      r.rental_date_key AS Rental_Day_Key,
      p.staff_key AS Staff_Key,
      s.first_name AS Staff_First_Name,
      s.last_name AS Staff_Last_Name,
      s.username AS Staff_Username,
      s.active AS Staff_Active_Status,
      s.email AS Staff_Email,
      p.amount AS Payment_Amount
  FROM payments_silver_tempview AS p
  INNER JOIN sakila_dlh.dim_customers AS c
  ON p.customer_key = c.customer_key
  INNER JOIN sakila_dlh.dim_rentals AS r
  ON p.rental_key = r.rental_key
  INNER JOIN sakila_dlh.dim_staff AS s
  ON p.staff_key = s.staff_key
  LEFT OUTER JOIN sakila_dlh.dim_date as rd
  ON r.rental_date_key = rd.date_key
)

In [0]:
# Creating Silver Table w/append command
(spark.table("fact_payments_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{payments_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_payments_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fd8bb4ea410>

In [0]:
%sql
-- Here, we are verifying the table was created --
SELECT * FROM fact_payments_silver

Fact_Payment_Key,Payment_ID,Customer_Key,Customer_First_Name,Customer_Last_Name,Customer_Email,Customer_Active_Status,Rental_Key,Rental_Day_Key,Staff_Key,Staff_First_Name,Staff_Last_Name,Staff_Username,Staff_Active_Status,Staff_Email,Payment_Amount
984,985,36,KATHLEEN,ADAMS,KATHLEEN.ADAMS@sakilacustomer.org,1,715,20050529,1,Mike,Hillyer,Mike,1,Mike.Hillyer@sakilastaff.com,0.99
983,984,36,KATHLEEN,ADAMS,KATHLEEN.ADAMS@sakilacustomer.org,1,348,20050527,1,Mike,Hillyer,Mike,1,Mike.Hillyer@sakilastaff.com,0.99
448,449,17,DONNA,THOMPSON,DONNA.THOMPSON@sakilacustomer.org,1,883,20050530,2,Jon,Stephens,Jon,1,Jon.Stephens@sakilastaff.com,4.99
494,495,19,RUTH,MARTINEZ,RUTH.MARTINEZ@sakilacustomer.org,1,695,20050529,2,Jon,Stephens,Jon,1,Jon.Stephens@sakilastaff.com,2.99
447,448,17,DONNA,THOMPSON,DONNA.THOMPSON@sakilacustomer.org,1,579,20050528,1,Mike,Hillyer,Mike,1,Mike.Hillyer@sakilastaff.com,2.99
469,470,18,CAROL,GARCIA,CAROL.GARCIA@sakilacustomer.org,1,691,20050529,1,Mike,Hillyer,Mike,1,Mike.Hillyer@sakilastaff.com,4.99
493,494,19,RUTH,MARTINEZ,RUTH.MARTINEZ@sakilacustomer.org,1,590,20050528,2,Jon,Stephens,Jon,1,Jon.Stephens@sakilastaff.com,2.99
580,581,22,LAURA,RODRIGUEZ,LAURA.RODRIGUEZ@sakilacustomer.org,1,819,20050529,2,Jon,Stephens,Jon,1,Jon.Stephens@sakilastaff.com,8.99
446,447,17,DONNA,THOMPSON,DONNA.THOMPSON@sakilacustomer.org,1,287,20050526,2,Jon,Stephens,Jon,1,Jon.Stephens@sakilastaff.com,2.99
468,469,18,CAROL,GARCIA,CAROL.GARCIA@sakilacustomer.org,1,116,20050525,1,Mike,Hillyer,Mike,1,Mike.Hillyer@sakilastaff.com,4.99


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_payments_silver

col_name,data_type,comment
Fact_Payment_Key,bigint,
Payment_ID,bigint,
Customer_Key,bigint,
Customer_First_Name,varchar(65535),
Customer_Last_Name,varchar(65535),
Customer_Email,varchar(65535),
Customer_Active_Status,bigint,
Rental_Key,bigint,
Rental_Day_Key,int,
Staff_Key,bigint,


###### c. Performing Gold Aggregations

###### The first query, I will perform is querying the number of payments each customer made on rentals and the total amount spent for those rentals

In [0]:
%sql
-- Getting the Number of Payments each Customer made on rentals and the total amount spent on those rentals
CREATE OR REPLACE TABLE sakila_dlh.fact_payments_gold AS (
  SELECT src.`Customer_First_Name`,
        src.`Customer_Last_Name`,
        src.`Customer_Email`,
        SUM(src.`Payment_Amount`) as Total_Amount,
        COUNT(*) AS Number_of_Payments
  FROM sakila_dlh.fact_payments_silver as src
  GROUP BY src.`Customer_First_Name`, src.`Customer_Last_Name`, src.`Customer_Email`
  ORDER BY src.`Customer_First_Name`, src.`Customer_Last_Name`
);

SELECT * FROM sakila_dlh.fact_payments_gold;

Customer_First_Name,Customer_Last_Name,Customer_Email,Total_Amount,Number_of_Payments
AMY,LOPEZ,AMY.LOPEZ@sakilacustomer.org,9.98,2
ANGELA,HERNANDEZ,ANGELA.HERNANDEZ@sakilacustomer.org,1.99,1
ANNA,HILL,ANNA.HILL@sakilacustomer.org,2.99,1
BETTY,WHITE,BETTY.WHITE@sakilacustomer.org,20.95,5
CAROL,GARCIA,CAROL.GARCIA@sakilacustomer.org,12.97,3
CYNTHIA,YOUNG,CYNTHIA.YOUNG@sakilacustomer.org,5.98,2
DEBORAH,WALKER,DEBORAH.WALKER@sakilacustomer.org,7.99,1
DONNA,THOMPSON,DONNA.THOMPSON@sakilacustomer.org,10.97,3
ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,0.99,1
JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,8.97,3


###### The second query I will perfom is querying the number of payments each staff member has processed

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_payments_gold AS (
  SELECT src.`Staff_First_Name`,
        src.`Staff_Last_Name`,
        src.`Staff_Email`,
        COUNT(*) AS Total_Number_Of_Payments_Processed
  FROM sakila_dlh.fact_payments_silver as src
  GROUP BY src.`Staff_First_Name`, src.`Staff_Last_Name`, src.`Staff_Email`
  ORDER BY src.`Staff_First_Name`, src.`Staff_Last_Name`
);

SELECT * FROM sakila_dlh.fact_payments_gold;

Staff_First_Name,Staff_Last_Name,Staff_Email,Total_Number_Of_Payments_Processed
Jon,Stephens,Jon.Stephens@sakilastaff.com,27
Mike,Hillyer,Mike.Hillyer@sakilastaff.com,35
