### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # Uses Koalas that is included in PySpark version 3.2 or newer
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables
##### 2.1. Create retail, batch, and streaming directories

In [0]:
# Azure MySQL Server Connection Information ########################
jdbc_hostname = "ds2002-mysql-kcm7zp.mysql.database.azure.com" 
jdbc_port = 3306
src_database = "sakila" 

# REPLACE WITH KELSEY CREDENTIALS
connection_properties = {
  "user" : "kmatsik",
  "password" : "Rosie3402",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "devcluster.ed41k"
atlas_database_name = "sakila" 
atlas_user_name = "kelseymatsik"
atlas_password = "Rosie3402"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh" # dhl = data lakehouse 

base_dir = "dbfs:/FileStore/project_data" # "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

# Create retail, batch, and stream directories 
dbutils.fs.mkdirs("dbfs:/FileStore/project_data/retail")
dbutils.fs.mkdirs("dbfs:/FileStore/project_data/retail/batch")
dbutils.fs.mkdirs("dbfs:/FileStore/project_data/retail/stream")

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

# Sakila DB Streaming 

## Define input locations for real-time or near-real time data ingestion
rental_stream_dir = f"{stream_dir}/rental"
payment_stream_dir = f"{stream_dir}/payment"
inventory_stream_dir = f"{stream_dir}/inventory"

## Create input locations for real-time or near-real time data ingestion
dbutils.fs.mkdirs(rental_stream_dir)
dbutils.fs.mkdirs(payment_stream_dir)
dbutils.fs.mkdirs(inventory_stream_dir)

# Fact Tables - Gold, Silver, Bronze Tiers 
## Rental 
rental_output_bronze = f"{database_dir}/fact_rental/bronze"
rental_output_silver = f"{database_dir}/fact_rental/silver"
rental_output_gold = f"{database_dir}/fact_rental/gold"

## Payments 
payment_output_bronze = f"{database_dir}/fact_payment/bronze"
payment_output_silver = f"{database_dir}/fact_payment/silver"
payment_output_gold = f"{database_dir}/fact_payment/gold"

## Inventory 
inventory_output_bronze = f"{database_dir}/fact_inventory/bronze"
inventory_output_silver = f"{database_dir}/fact_inventory/silver"
inventory_output_gold = f"{database_dir}/fact_inventory/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 
dbutils.fs.rm(f"{database_dir}/fact_payment", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True) # Will return 'False' the first time you run the code

True

#### 3.0. Define Global Functions

In [0]:
# Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe
  
  # Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) 
#### 1.0. Fetch Reference Data from Azure MySQL Database
##### 1.1. Create New Databricks Metadata Database 

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE; -- 'CASCADE' removes data in the database if it exists 

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Project 2 Database" -- Add metadata
LOCATION "dbfs:/FileStore/project_data/sakila_dlh" -- Specify where you want data files to be
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Project 2");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL Database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date 
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-kcm7zp.mysql.database.azure.com:3306/sakila", 
  dbtable "dim_date",
  user "kmatsik",    
  password "Rosie3402"  
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/project_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5 ; -- Verify dim_date was successfully created 

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Actor Dimension Data from an Azure MySQL database

In [0]:
%sql
-- Create a Temporary View named "view_actor" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_actor 
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-kcm7zp.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "actor",
  user "kmatsik",    --Replace with your User Name
  password "Rosie3402"  --Replace with you password
)

In [0]:
%sql
-- Create a new table named "sakila_dlh.dim_actor" using data from the view named "view_actor"
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_actor
COMMENT "Actor Dimension Table"
LOCATION "dbfs:/FileStore/project_data/sakila_dlh/dim_actor"
AS SELECT * FROM view_actor

num_affected_rows,num_inserted_rows


##### 1.4. Create a New Table that Sources Address Dimension Data from an Azure MySQL database

In [0]:
%sql
-- Create a Temporary View named "view_address" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_address 
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-kcm7zp.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "address",
  user "kmatsik",    --Replace with your User Name
  password "Rosie3402"  --Replace with you password
)

In [0]:
%sql
-- Create a new table named "sakila_dlh.dim_address" using data from the view named "view_address"
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_address
COMMENT "Address Dimension Table"
LOCATION "dbfs:/FileStore/project_data/sakila_dlh/dim_address"
AS SELECT * FROM view_address

num_affected_rows,num_inserted_rows


##### 1.5. Create a New Table that Sources Category Dimension Data from an Azure MySQL database

In [0]:
%sql
-- Create a Temporary View named "view_category" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_category 
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-kcm7zp.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "category",
  user "kmatsik",    --Replace with your User Name
  password "Rosie3402"  --Replace with you password
)

In [0]:
%sql
-- Create a new table named "sakila_dlh.dim_category" using data from the view named "view_category"
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_category
COMMENT "Category Dimension Table"
LOCATION "dbfs:/FileStore/project_data/sakila_dlh/dim_category"
AS SELECT * FROM view_category

num_affected_rows,num_inserted_rows


##### 1.6. Create a New Table that Sources City Dimension Data from an Azure MySQL database

In [0]:
%sql
-- Create a Temporary View named "view_city" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_city 
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-kcm7zp.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "city",
  user "kmatsik",    --Replace with your User Name
  password "Rosie3402"  --Replace with you password
)

In [0]:
%sql
-- Create a new table named "sakila_dlh.dim_city" using data from the view named "view_city"
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_city
COMMENT "City Dimension Table"
LOCATION "dbfs:/FileStore/project_data/sakila_dlh/dim_city"
AS SELECT * FROM view_city

num_affected_rows,num_inserted_rows


#### 2.0. Fetch Reference Data from MongoDB Atlas Database
##### View Data Files on Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  

path,name,size,modificationTime
dbfs:/FileStore/project_data/retail/batch/staff.csv,staff.csv,309,1733425907000
dbfs:/FileStore/project_data/retail/batch/store.csv,store.csv,105,1733425907000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**Note:** Only run the below cell once.

In [0]:
# Load data files from batch directory into MongoDB sakila database
source_dir = '/dbfs/FileStore/project_data/sakila_dlh/'
json_files = {"country" : 'country.json',
              "customer" : 'customer.json',
              "film_actor": 'film_actor.json',
              "film_category": 'film_category.json',
              "film": 'film.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f5b6463b980>

In [0]:
%scala
import com.mongodb.spark._

val userName = "kelseymatsik"
val pwd = "Rosie3402"
val clusterName = "devcluster.ed41k"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

##### 2.3.1. Fetch Country Dimension Data from the New MongoDB Collection

In [0]:
%scala
val df_country = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila")
.option("collection", "country").load()
.select("country", "country_id", "last_update")

display(df_country)

country,country_id,last_update
Afghanistan,1,2006-02-15 04:44:00
Algeria,2,2006-02-15 04:44:00
American Samoa,3,2006-02-15 04:44:00
Angola,4,2006-02-15 04:44:00
Anguilla,5,2006-02-15 04:44:00
Argentina,6,2006-02-15 04:44:00
Armenia,7,2006-02-15 04:44:00
Australia,8,2006-02-15 04:44:00
Austria,9,2006-02-15 04:44:00
Azerbaijan,10,2006-02-15 04:44:00


##### 2.3.2. Use the Spark DataFrame to Create a New Country Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_country.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_country")

##### 2.4.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila")
.option("collection", "customer")
.option("uri", atlas_uri).load()
.select("customer_id", "store_id", "first_name", "last_name", "email", "address_id", "active", "create_date", "last_update")

display(df_customer)

customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20
6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1,2006-02-14 22:04:36,2006-02-15 04:57:20
7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,1,2006-02-14 22:04:36,2006-02-15 04:57:20
8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,1,2006-02-14 22:04:36,2006-02-15 04:57:20
9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,1,2006-02-14 22:04:36,2006-02-15 04:57:20
10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,1,2006-02-14 22:04:36,2006-02-15 04:57:20


##### 2.4.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

##### 2.5.1. Fetch Film Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_film = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila")
.option("collection", "film")
.option("uri", atlas_uri).load()
.select("film_id", "title", "description", "release_year", "language_id", "original_language_id", "rental_duration", "rental_rate", "length", "replacement_cost", "rating", "special_features", "last_update")

display(df_film)

film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15 05:03:42
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15 05:03:42
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15 05:03:42
6,AGENT TRUMAN,A Intrepid Panorama of a Robot And a Boy who must Escape a Sumo Wrestler in Ancient China,2006,1,,3,2.99,169,17.99,PG,Deleted Scenes,2006-02-15 05:03:42
7,AIRPLANE SIERRA,A Touching Saga of a Hunter And a Butler who must Discover a Butler in A Jet Boat,2006,1,,6,4.99,62,28.99,PG-13,"Trailers,Deleted Scenes",2006-02-15 05:03:42
8,AIRPORT POLLOCK,A Epic Tale of a Moose And a Girl who must Confront a Monkey in Ancient India,2006,1,,6,4.99,54,15.99,R,Trailers,2006-02-15 05:03:42
9,ALABAMA DEVIL,A Thoughtful Panorama of a Database Administrator And a Mad Scientist who must Outgun a Mad Scientist in A Jet Boat,2006,1,,3,2.99,114,21.99,PG-13,"Trailers,Deleted Scenes",2006-02-15 05:03:42
10,ALADDIN CALENDAR,A Action-Packed Tale of a Man And a Lumberjack who must Reach a Feminist in Ancient China,2006,1,,6,4.99,63,24.99,NC-17,"Trailers,Deleted Scenes",2006-02-15 05:03:42


##### 2.5.2. Use the Spark DataFrame to Create a New Film Dimension Table in the Databricks Metadata Database (sakila_dlh)


In [0]:
%scala
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film")

##### 2.6.1. Fetch Film_Actor Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_film_actor = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila")
.option("collection", "film_actor")
.option("uri", atlas_uri).load()
.select("actor_id", "film_id", "last_update")

display(df_film_actor)

actor_id,film_id,last_update
1,1,2006-02-15 05:05:03
1,23,2006-02-15 05:05:03
1,25,2006-02-15 05:05:03
1,106,2006-02-15 05:05:03
1,140,2006-02-15 05:05:03
1,166,2006-02-15 05:05:03
1,277,2006-02-15 05:05:03
1,361,2006-02-15 05:05:03
1,438,2006-02-15 05:05:03
1,499,2006-02-15 05:05:03


##### 2.6.2. Use the Spark DataFrame to Create a New Film_Actor Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_film_actor.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film_actor")

##### 2.7.1. Fetch Film_Category Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_film_category = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila")
.option("collection", "film_category")
.option("uri", atlas_uri).load()
.select("category_id", "film_id", "last_update")

display(df_film_category)

category_id,film_id,last_update
6,1,2006-02-15 05:07:09
11,2,2006-02-15 05:07:09
6,3,2006-02-15 05:07:09
11,4,2006-02-15 05:07:09
8,5,2006-02-15 05:07:09
9,6,2006-02-15 05:07:09
5,7,2006-02-15 05:07:09
11,8,2006-02-15 05:07:09
11,9,2006-02-15 05:07:09
15,10,2006-02-15 05:07:09


##### 2.7.2. Use the Spark DataFrame to Create a New Film_Category Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_film_category.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film_category")

#### 3.0 Fetch Data from a File System (DBFS)
##### 3.1.1. Use PySpark to Read Staff Dimension Table from a CSV File

In [0]:
staff_csv = f"{batch_dir}/staff.csv"

df_staff = spark.read.format('csv').options(header='true', inferSchema='true').load(staff_csv)
display(df_staff)

staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16Z
2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15T03:57:16Z


##### 3.1.2. Create a New Staff Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

##### 3.2.1. Use PySpark to Read Store Dimension Table from a CSV File

In [0]:
store_csv = f"{batch_dir}/store.csv"

df_store = spark.read.format('csv').options(header='true', inferSchema='true').load(store_csv)
display(df_store)

store_id,manager_staff_id,address_id,last_update
1,1,1,2006-02-15T04:57:12Z
2,2,2,2006-02-15T04:57:12Z


##### 3.2.2. Create a New Store Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
df_store.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_store")

#### 4.0. Verify Dimension Tables


In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_country,False
sakila_dlh,dim_customer,False
sakila_dlh,dim_film,False
sakila_dlh,dim_film_actor,False
sakila_dlh,dim_film_category,False
sakila_dlh,dim_staff,False
sakila_dlh,dim_store,False
,_sqldf,True
,view_actor,True
,view_address,True


### Section III: Integrate Reference Data with Real-Time Data
#### 5.0. Use AutoLoader to Process Streaming (Hot Path) Rental Fact Data 
Fact Tables 
- Rental 
- Payment 
- Inventory
##### 5.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", rental_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rental_stream_dir)
 .createOrReplaceTempView("rental_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rental_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rental_raw_tempview
)

In [0]:
%sql
DESCRIBE EXTENDED rental_bronze_tempview

col_name,data_type,comment
customer_id,bigint,
inventory_id,bigint,
last_update,string,
rental_date,string,
rental_id,bigint,
return_date,string,
staff_id,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
# Stream new data from temporary view table into Delta Lake fact table
(spark.table("rental_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5b64634c90>

##### 5.2. Silver Table: Include Reference Data

In [0]:
# Read in silver temporary view table from bronze table
(spark.readStream
  .table("fact_rental_bronze")
  .createOrReplaceTempView("rental_silver_tempview"))

In [0]:
%sql
SELECT * FROM rental_silver_tempview LIMIT 5;

customer_id,inventory_id,last_update,rental_date,rental_id,return_date,staff_id,_rescued_data,receipt_time,source_file
130,367,2006-02-15 21:30:53,2005-05-24 22:53:30,1,2005-05-26 22:04:30,1,,2024-12-08T03:13:21.307Z,dbfs:/FileStore/project_data/retail/stream/rental/rental.json
459,1525,2006-02-15 21:30:53,2005-05-24 22:54:33,2,2005-05-28 19:40:33,1,,2024-12-08T03:13:21.307Z,dbfs:/FileStore/project_data/retail/stream/rental/rental.json
408,1711,2006-02-15 21:30:53,2005-05-24 23:03:39,3,2005-06-01 22:12:39,1,,2024-12-08T03:13:21.307Z,dbfs:/FileStore/project_data/retail/stream/rental/rental.json
333,2452,2006-02-15 21:30:53,2005-05-24 23:04:41,4,2005-06-03 01:43:41,2,,2024-12-08T03:13:21.307Z,dbfs:/FileStore/project_data/retail/stream/rental/rental.json
222,2079,2006-02-15 21:30:53,2005-05-24 23:05:21,5,2005-06-02 04:33:21,1,,2024-12-08T03:13:21.307Z,dbfs:/FileStore/project_data/retail/stream/rental/rental.json


In [0]:
%sql
DESCRIBE EXTENDED rental_silver_tempview

col_name,data_type,comment
customer_id,bigint,
inventory_id,bigint,
last_update,string,
rental_date,string,
rental_id,bigint,
return_date,string,
staff_id,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql
-- Select data to be included in fact rental table
CREATE OR REPLACE TEMPORARY VIEW fact_rental_silver_tempview AS (
  SELECT 
        r.rental_id,
        r.inventory_id,
        r.customer_id, -- join with customer table
        r.staff_id, -- join with staff table
        r.rental_date, 
        r.return_date, 
        r.receipt_time, -- Modify name later?
        CONCAT(cu.first_name, ' ', cu.last_name) AS customer_name, -- Join with customer table
        cu.email AS customer_email_address,  -- join with customer
        a.address AS customer_address, -- join with address
        ci.city AS customer_address_city, -- join with city
        a.postal_code AS customer_address_postal_code, 
        a.phone AS customer_phone_number, 
        rta.day_name_of_week AS rental_day_of_week, -- rta = rental data
        rta.weekday_weekend AS rental_weekday_weekend, 
        rta.calendar_quarter AS rental_calendar_quarter,
        rtn.day_name_of_week AS return_day_of_week, -- rtn = return data
        rtn.weekday_weekend AS return_weekday_weekend,
        rtn.calendar_quarter AS return_calendar_quarter
  FROM rental_silver_tempview AS r 
    INNER JOIN sakila_dlh.dim_customer AS cu
    ON r.customer_id = cu.customer_id
    INNER JOIN sakila_dlh.dim_address AS a 
    ON cu.address_id = a.address_id
    INNER JOIN sakila_dlh.dim_city AS ci 
    ON a.city_id = ci.city_id
    LEFT OUTER JOIN sakila_dlh.dim_date AS rta
    ON r.rental_date = rta.full_date
    LEFT OUTER JOIN sakila_dlh.dim_date AS rtn
    ON r.return_date = rtn.full_date
);

In [0]:
(spark.table("fact_rental_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5b647d1d90>

In [0]:
%sql
SELECT * FROM fact_rental_silver LIMIT 5

rental_id,inventory_id,customer_id,staff_id,rental_date,return_date,receipt_time,customer_name,customer_email_address,customer_address,customer_address_city,customer_address_postal_code,customer_phone_number,rental_day_of_week,rental_weekday_weekend,rental_calendar_quarter,return_day_of_week,return_weekday_weekend,return_calendar_quarter
1,367,130,1,2005-05-24 22:53:30,2005-05-26 22:04:30,2024-12-08T03:13:21.307Z,CHARLOTTE HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,758 Junan Lane,Águas Lindas de Goiás,82639,935448624185,Tuesday,Weekday,2,Thursday,Weekday,2
2,1525,459,1,2005-05-24 22:54:33,2005-05-28 19:40:33,2024-12-08T03:13:21.307Z,TOMMY COLLAZO,TOMMY.COLLAZO@sakilacustomer.org,76 Kermanshah Manor,Qomsheh,23343,762361821578,Tuesday,Weekday,2,Saturday,Weekend,2
3,1711,408,1,2005-05-24 23:03:39,2005-06-01 22:12:39,2024-12-08T03:13:21.307Z,MANUEL MURRELL,MANUEL.MURRELL@sakilacustomer.org,692 Amroha Drive,Jaffna,35575,359478883004,Tuesday,Weekday,2,Wednesday,Weekday,2
4,2452,333,2,2005-05-24 23:04:41,2005-06-03 01:43:41,2024-12-08T03:13:21.307Z,ANDREW PURDY,ANDREW.PURDY@sakilacustomer.org,431 Székesfehérvár Avenue,Baku,57828,119501405123,Tuesday,Weekday,2,Friday,Weekday,2
5,2079,222,1,2005-05-24 23:05:21,2005-06-02 04:33:21,2024-12-08T03:13:21.307Z,DELORES HANSEN,DELORES.HANSEN@sakilacustomer.org,810 Palghat (Palakkad) Boulevard,Jaroslavl,73431,516331171356,Tuesday,Weekday,2,Thursday,Weekday,2


##### 5.3. Gold Table: Perform Aggregations

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_most_recent_customer_rentals AS (
  SELECT customer_name AS CustomerName
  , customer_email_address AS CustomerEmailAddress
  , customer_phone_number As CustomerPhoneNumber
  , customer_address as CustomerAddress
  , customer_address_city as CustomerAddressCity
  , customer_address_postal_code as CustomerAddressPostalCode
  , rental_date AS MovieRentalDate
  , return_date AS MovieReturnDate
FROM sakila_dlh.fact_rental_silver
GROUP BY CustomerName, CustomerEmailAddress, CustomerPhoneNumber, CustomerAddress, CustomerAddressCity, CustomerAddressPostalCode, MovieRentalDate, MovieReturnDate
ORDER BY MovieRentalDate DESC); 

SELECT * FROM sakila_dlh.fact_most_recent_customer_rentals LIMIT 15; 
  

CustomerName,CustomerEmailAddress,CustomerPhoneNumber,CustomerAddress,CustomerAddressCity,CustomerAddressPostalCode,MovieRentalDate,MovieReturnDate
JUDITH COX,JUDITH.COX@sakilacustomer.org,333489324603,1966 Amroha Avenue,Daxian,70385,2005-05-31 00:46:31,2005-06-06 06:14:31
STEPHEN QUALLS,STEPHEN.QUALLS@sakilacustomer.org,38988715447,1838 Tabriz Lane,Dhaka,1195,2005-05-31 00:25:56,2005-06-08 19:42:56
MARVIN YEE,MARVIN.YEE@sakilacustomer.org,480039662421,126 Acuña Parkway,Berhampore (Baharampur),58888,2005-05-31 00:25:10,2005-06-03 06:05:10
ALAN KAHN,ALAN.KAHN@sakilacustomer.org,464511145118,753 Ilorin Avenue,Emeishan,3656,2005-05-31 00:16:57,2005-06-01 22:41:57
NATALIE MEYER,NATALIE.MEYER@sakilacustomer.org,873492228462,1201 Qomsheh Manor,Aparecida de Goiânia,21464,2005-05-31 00:08:25,2005-06-02 00:17:25
ERIC ROBERT,ERIC.ROBERT@sakilacustomer.org,105470691550,430 Kumbakonam Drive,Santa Fé,28814,2005-05-31 00:06:20,2005-05-31 21:29:20
DANIELLE DANIELS,DANIELLE.DANIELS@sakilacustomer.org,632316273199,781 Shimonoseki Drive,Hidalgo,95444,2005-05-31 00:06:02,2005-06-06 02:30:02
GREG ROBINS,GREG.ROBINS@sakilacustomer.org,206060652238,1786 Salinas Place,Nam Dinh,66546,2005-05-30 23:55:36,2005-06-04 01:00:36
DUANE TUBBS,DUANE.TUBBS@sakilacustomer.org,282667506728,962 Tama Loop,Yangor,65952,2005-05-30 23:54:19,2005-06-04 21:27:19
SHANNON FREEMAN,SHANNON.FREEMAN@sakilacustomer.org,677976133614,117 Boa Vista Way,Varanasi (Benares),6804,2005-05-30 23:47:56,2005-06-05 19:01:56


#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Payment Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", payment_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(payment_stream_dir)
 .createOrReplaceTempView("payment_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW payment_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM payment_raw_tempview
)

In [0]:
# Stream new data from temporary view table into Delta Lake fact table
(spark.table("payment_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{payment_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_payment_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5b64682390>

##### 6.2. Silver Table: Include Reference Data

In [0]:
# Read in silver temporary view table from bronze table
(spark.readStream
  .table("fact_payment_bronze")
  .createOrReplaceTempView("payment_silver_tempview"))

In [0]:
%sql
DESCRIBE EXTENDED payment_silver_tempview

col_name,data_type,comment
amount,double,
customer_id,bigint,
last_update,string,
payment_date,string,
payment_id,bigint,
rental_id,bigint,
staff_id,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql
-- Select data to be included in payment rental table
CREATE OR REPLACE TEMPORARY VIEW fact_payment_silver_tempview AS (
  SELECT 
        p.payment_id, 
        p.customer_id, 
        CONCAT(cu.first_name, ' ', cu.last_name) AS customer_name, -- Join with customer table
        cu.email AS customer_email_address, 
        p.amount, 
        p.payment_date, 
        CONCAT(sa.first_name, ' ', sa.last_name) AS staff_full_name, -- Join with staff on p.staff_id = s.staff_id
        sa.username AS staff_username,
        sa.password AS staff_password, 
        sa.last_update AS staff_last_update, 
        so.manager_staff_id, -- INNER JOIN staff AS sa with store as so ON sa.store_id = so.store_id
        so.address_id, 
        a.address AS store_address, -- INNER JOIN sakila_dlh.dim_address AS a ON so.address_id = a.address_id
        c.city AS store_city, -- INNER JOIN sakila_dlh.dim_city AS c ON a.city_id = c.city_id
        a.postal_code AS store_postal_code, 
        pd.calendar_year_qtr AS payment_calendar_yr_quarter, -- pd = payment data
        pd.fiscal_month_of_year AS payment_fiscal_month_of_year, 
        pd.fiscal_quarter AS payment_fiscal_quarter, 
        pd.fiscal_year AS payment_fiscal_year  
  FROM payment_silver_tempview AS p 
    INNER JOIN sakila_dlh.dim_customer AS cu
    ON p.customer_id = cu.customer_id
    INNER JOIN sakila_dlh.dim_staff AS sa 
    ON p.staff_id = sa.staff_id
    INNER JOIN sakila_dlh.dim_store AS so 
    ON sa.store_id = so.store_id
    INNER JOIN sakila_dlh.dim_address AS a 
    ON so.address_id = a.address_id
    INNER JOIN sakila_dlh.dim_city AS c 
    ON a.city_id = c.city_id
    LEFT OUTER JOIN sakila_dlh.dim_date AS pd
    ON p.payment_date = pd.full_date
);

In [0]:
(spark.table("fact_payment_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{payment_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_payment_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5b64511410>

In [0]:
%sql
SELECT * FROM fact_payment_silver LIMIT 5;

payment_id,customer_id,customer_name,customer_email_address,amount,payment_date,staff_full_name,staff_username,staff_password,staff_last_update,manager_staff_id,address_id,store_address,store_city,store_postal_code,payment_calendar_yr_quarter,payment_fiscal_month_of_year,payment_fiscal_quarter,payment_fiscal_year


##### 6.3. Gold Table: Perform Aggregations

In [0]:
%sql 
-- TEST LATER 
CREATE OR REPLACE TABLE sakila_dlh.fact_total_movie_payments AS (
  SELECT customer_name AS CustomerName
  , SUM(amount) AS TotalAmountPaid
  , customer_email_address AS CustomerEmailAddress
  , payment_date AS PaymentDate
  , store_address AS StoreAddress
  , store_city AS StoreCity
  , store_postal_code AS StorePostalCode
  FROM fact_payment_silver
  GROUP BY CustomerName, CustomerEmailAddress, PaymentDate, StoreAddress, StoreCity, StorePostalCode
  ORDER BY TotalAmountPaid DESC); 

  SELECT * FROM sakila_dlh.fact_total_movie_payments LIMIT 10;

CustomerName,TotalAmountPaid,CustomerEmailAddress,PaymentDate,StoreAddress,StoreCity,StorePostalCode
KAREN JACKSON,11.99,KAREN.JACKSON@sakilacustomer.org,2005-07-29 22:37:41,28 MySQL Boulevard,Woodridge,
MICHELLE CLARK,10.99,MICHELLE.CLARK@sakilacustomer.org,2005-06-21 01:04:35,47 MySakila Drive,Lethbridge,
PATRICIA JOHNSON,10.99,PATRICIA.JOHNSON@sakilacustomer.org,2005-07-30 13:47:43,28 MySQL Boulevard,Woodridge,
NANCY THOMAS,10.99,NANCY.THOMAS@sakilacustomer.org,2005-08-01 06:50:26,28 MySQL Boulevard,Woodridge,
ANGELA HERNANDEZ,10.99,ANGELA.HERNANDEZ@sakilacustomer.org,2005-07-09 21:55:19,28 MySQL Boulevard,Woodridge,
ANNA HILL,10.99,ANNA.HILL@sakilacustomer.org,2005-06-15 09:46:33,47 MySakila Drive,Lethbridge,
LINDA WILLIAMS,10.99,LINDA.WILLIAMS@sakilacustomer.org,2005-07-27 20:23:12,28 MySQL Boulevard,Woodridge,
KAREN JACKSON,9.99,KAREN.JACKSON@sakilacustomer.org,2005-07-30 14:38:22,28 MySQL Boulevard,Woodridge,
CYNTHIA YOUNG,9.99,CYNTHIA.YOUNG@sakilacustomer.org,2005-07-27 23:07:40,47 MySakila Drive,Lethbridge,
RUTH MARTINEZ,9.99,RUTH.MARTINEZ@sakilacustomer.org,2005-07-31 20:01:06,28 MySQL Boulevard,Woodridge,


#### 7.0. Use AutoLoader to Process Streaming (Hot Path) Inventory Fact Data 
##### 7.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", inventory_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(inventory_stream_dir)
 .createOrReplaceTempView("inventory_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW inventory_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM inventory_raw_tempview
)

In [0]:
# Stream new data from temporary view table into Delta Lake fact table
(spark.table("inventory_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{inventory_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_inventory_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5b64554ed0>

##### 7.2. Silver Table: Include Reference Data

In [0]:
# Read in silver temporary view table from bronze table
(spark.readStream
  .table("fact_inventory_bronze")
  .createOrReplaceTempView("inventory_silver_tempview"))

In [0]:
%sql
DESCRIBE EXTENDED inventory_silver_tempview

col_name,data_type,comment
film_id,bigint,
inventory_id,bigint,
last_update,string,
store_id,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql
-- Select data to be included in inventory rental table
CREATE OR REPLACE TEMPORARY VIEW fact_inventory_silver_tempview AS (
  SELECT 
        i.inventory_id, 
        i.film_id, 
        f.title AS film_title,
        f.description AS film_description, 
        f.release_year AS film_release_year, 
        f.rental_rate AS film_rental_rate, 
        f.length AS film_length, 
        f.replacement_cost AS film_replacement_cost, 
        f.rating AS film_rating, 
        f.special_features AS film_special_features, 
        c.name AS film_category,
        CONCAT(a.first_name, ' ', a.last_name) AS actor_name,
        r.rental_date AS rental_date -- comment out if not working
  FROM inventory_silver_tempview AS i 
    INNER JOIN sakila_dlh.dim_film AS f
    ON i.film_id = f.film_id
    INNER JOIN sakila_dlh.dim_film_category AS fc
    ON f.film_id = fc.film_id
    INNER JOIN sakila_dlh.dim_category AS c
    ON fc.category_id = c.category_id
    INNER JOIN sakila_dlh.dim_film_actor AS fa
    ON f.film_id = fa.film_id
    INNER JOIN sakila_dlh.dim_actor AS a
    ON fa.actor_id = a.actor_id
    INNER JOIN sakila_dlh.fact_rental_silver AS r -- comment out if not working
    ON i.inventory_id = r.inventory_id
);

In [0]:
%python
(spark.table("fact_inventory_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{inventory_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_inventory_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5b64569890>

In [0]:
%sql 
DESCRIBE EXTENDED fact_inventory_silver_tempview

col_name,data_type,comment
inventory_id,bigint,
film_id,bigint,
film_title,string,
film_description,string,
film_release_year,int,
film_rental_rate,double,
film_length,int,
film_replacement_cost,double,
film_rating,string,
film_special_features,string,


##### 7.3. Gold Table: Perform Aggregations

In [0]:
%sql
-- Creating or replacing a new table for this aggregation isn't working
CREATE OR REPLACE TABLE sakila_dlh.fact_most_rented_movies AS (
  SELECT film_title AS FilmTitle
  , film_rating AS FilmRating
  , film_category AS FilmCategory
  , actor_name AS FilmActor
  , COUNT(*) AS NumberTimesRented
  , MAX(rental_date) AS LastDateRented
FROM sakila_dlh.fact_inventory_silver
GROUP BY FilmTitle, FilmRating, FilmCategory, FilmActor
ORDER BY NumberTimesRented DESC); 

SELECT * FROM sakila_dlh.fact_most_rented_movies LIMIT 20; 

FilmTitle,FilmRating,FilmCategory,FilmActor,NumberTimesRented,LastDateRented
BOOGIE AMELIE,R,Music,KEVIN BLOOM,4,2005-05-30 22:59:12
CHICKEN HELLFIGHTERS,PG,Documentary,KIRSTEN PALTROW,3,2005-05-30 18:53:21
BUCKET BROTHERHOOD,PG,Travel,TIM HACKMAN,3,2005-05-30 08:02:56
CHEAPER CLYDE,G,Sci-Fi,LUCILLE TRACY,3,2005-05-30 17:30:28
COAST RAINBOW,PG,Documentary,BETTE NICHOLSON,3,2005-05-27 11:45:49
CENTER DINOSAUR,PG,Classics,ALEC WAYNE,3,2005-05-30 06:15:36
BUCKET BROTHERHOOD,PG,Travel,RIP CRAWFORD,3,2005-05-30 08:02:56
BINGO TALENTED,PG-13,Sci-Fi,GOLDIE BRODY,3,2005-05-28 20:16:20
COAST RAINBOW,PG,Documentary,SISSY SOBIESKI,3,2005-05-27 11:45:49
COAST RAINBOW,PG,Documentary,CAMERON STREEP,3,2005-05-27 11:45:49
