
### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "mst9fd-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila"

connection_properties = {
  "user" : "mst9fd",
  "password" : "Password123!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.x3wvdfc"
atlas_database_name = "sakila_dw2"
atlas_user_name = "uvamst9fd"
atlas_password = "Password123"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/project data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rental_stream_dir = f"{stream_dir}/rental"
inventory_stream_dir = f"{stream_dir}/inventory"
customer_stream_dir = f"{stream_dir}/customer"

rentals_output_bronze = f"{database_dir}/fact_rental/bronze"
rentals_output_silver = f"{database_dir}/fact_rental/silver"
rentals_output_gold   = f"{database_dir}/fact_rental/gold"


# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rental", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Capstone Project Database"
LOCATION "dbfs:/FileStore/project data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone Project");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql:// mst9fd-mysql.mysql.database.azure.com:3306/northwind_dw2", --Replace with your Server Name
  dbtable "dim_date",
  user "mst9fd",    --Replace with your User Name
  password "Password123!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/project data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Rental Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_rental" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_rental
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://mst9fd-mysql.mysql.database.azure.com:3306/sakila",
  dbtable "rental",
  user "mst9fd",
  password "Password123!"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila.dim_rental" using data from the view named "view_rental"
CREATE OR REPLACE TABLE sakila_dlh.dim_rentals
COMMENT "Rental Dimension Table"
LOCATION "dbfs:/FileStore/project data/sakila_dlh/dim_rentals"
AS SELECT * FROM view_rental

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_rentals;

col_name,data_type,comment
rental_id,int,
rental_date,timestamp,
inventory_id,bigint,
customer_id,int,
return_date,timestamp,
staff_id,int,
last_update,timestamp,
,,
# Delta Statistics Columns,,
Column Names,"customer_id, rental_date, rental_id, inventory_id, last_update, staff_id, return_date",


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_rentals LIMIT 5

rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
1,2005-05-24T22:53:30Z,367,130,2005-05-26T22:04:30Z,1,2006-02-15T21:30:53Z
2,2005-05-24T22:54:33Z,1525,459,2005-05-28T19:40:33Z,1,2006-02-15T21:30:53Z
3,2005-05-24T23:03:39Z,1711,408,2005-06-01T22:12:39Z,1,2006-02-15T21:30:53Z
4,2005-05-24T23:04:41Z,2452,333,2005-06-03T01:43:41Z,2,2006-02-15T21:30:53Z
5,2005-05-24T23:05:21Z,2079,222,2005-06-02T04:33:21Z,1,2006-02-15T21:30:53Z


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/project data/batch'

path,name,size,modificationTime
dbfs:/FileStore/project data/batch/fact_rental.json,fact_rental.json,634662,1715286978000
dbfs:/FileStore/project data/batch/sakila_customer.json,sakila_customer.json,160173,1715286978000
dbfs:/FileStore/project data/batch/sakila_inventory.json,sakila_inventory.json,107401,1715286979000
dbfs:/FileStore/project data/batch/sakila_rental.json,sakila_rental.json,215475,1715286979000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/project data/batch'
json_files = {"customers" : 'sakila_customer.json'
              , "inventory" : 'sakila_inventory.json'
              , "rental" : 'sakila_rental.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7efc38255540>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "uvamst9fd"
val pwd = "Password123"
val clusterName = "cluster0.x3wvdfc"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw2")
.option("collection", "customers").load()
.select("customer_id","store_id","first_name","last_name","email","address_id","active")

display(df_customer)

customer_id,store_id,first_name,last_name,email,address_id,active
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1
6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1
7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,1
8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,1
9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,1
10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,1


In [0]:
%scala
df_customer.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_id,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
address_id,int,
active,int,
,,
# Delta Statistics Columns,,
Column Names,"first_name, customer_id, email, store_id, address_id, last_name, active",


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_id,store_id,first_name,last_name,email,address_id,active
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1


##### 2.4.1 Fetch Inventory Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_inventory = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw2")
.option("collection", "inventory")
.option("uri", atlas_uri).load()
.select("inventory_id","film_id","store_id")

display(df_inventory)

inventory_id,film_id,store_id
1,1,1
2,1,1
3,1,1
4,1,1
5,1,2
6,1,2
7,1,2
8,1,2
9,2,2
10,2,2


In [0]:
%scala
df_inventory.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Inventory Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory

col_name,data_type,comment
inventory_id,int,
film_id,int,
store_id,int,
,,
# Delta Statistics Columns,,
Column Names,"inventory_id, film_id, store_id",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

inventory_id,film_id,store_id
1,1,1
2,1,1
3,1,1
4,1,1
5,1,2


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_rentals,False
,view_date,True
,view_rental,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Rentals Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "store_key BIGINT")
 #.option("cloudFiles.schemaHints", "address_key BIGINT")
 #.option("cloudFiles.schemaHints", "address STRING")
 #.option("cloudFiles.schemaHints", "district STRING") 
 #.option("cloudFiles.schemaHints", "customer_key BIGINT")
 #.option("cloudFiles.schemaHints", "first_name STRING")
 #.option("cloudFiles.schemaHints", "last_name STRING")
 #.option("cloudFiles.schemaHints", "email STRING")
 #.option("cloudFiles.schemaHints", "active BIGINT") 
 #.option("cloudFiles.schemaHints", "payment_key BIGINT")
 #.option("cloudFiles.schemaHints", "amount DECIMAL")
 #.option("cloudFiles.schemaHints", "rental_key BIGINT")
 #.option("cloudFiles.schemaHints", "inventory_key BIGINT")
 #.option("cloudFiles.schemaHints", "film_key BIGINT")
 #.option("cloudFiles.schemaHints", "title STRING")
 #.option("cloudFiles.schemaHints", "release_year BIGINT")
 #.option("cloudFiles.schemaHints", "rental_duration BIGINT")
 #.option("cloudFiles.schemaHints", "rental_rate DECIMAL")
 #.option("cloudFiles.schemaHints", "length BIGINT")
 #.option("cloudFiles.schemaHints", "replacement_cost DECIMAL")
 #.option("cloudFiles.schemaHints", "rating STRING")
 #.option("cloudFiles.schemaHints", "rental_date_key BIGINT")
 #.option("cloudFiles.schemaHints", "return_date_key BIGINT")
 #.option("cloudFiles.schemaHints", "payment_date_key BIGINT")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rental_stream_dir)
 .createOrReplaceTempView("rental_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rental_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

active,address,address_key,amount,customer_id,customer_key,district,email,film_key,first_name,inventory_id,inventory_key,language_id,last_name,last_update,length,payment_date_key,payment_key,rating,release_year,rental_date,rental_date_key,rental_duration,rental_id,rental_key,rental_rate,replacement_cost,return_date,return_date_key,staff_id,store_key,title,_rescued_data,receipt_time,source_file
1,47 MySakila Drive,1,2.99,,130,Alberta,CHARLOTTE.HUNTER@sakilacustomer.org,80,CHARLOTTE,,367,1,HUNTER,,148,,3503,G,2006,,,7,,1,2.99,21.99,,,,1,BLANKET BEVERLY,,2024-05-10T20:32:25.129Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,47 MySakila Drive,1,2.99,,459,Alberta,TOMMY.COLLAZO@sakilacustomer.org,333,TOMMY,,1525,1,COLLAZO,,126,,12374,R,2006,,,7,,2,2.99,16.99,,,,1,FREAKY POCUS,,2024-05-10T20:32:25.129Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,47 MySakila Drive,1,3.99,,408,Alberta,MANUEL.MURRELL@sakilacustomer.org,373,MANUEL,,1711,1,MURRELL,,156,,11029,G,2006,,,7,,3,2.99,14.99,,,,1,GRADUATE LORD,,2024-05-10T20:32:25.129Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,28 MySQL Boulevard,2,4.99,,333,QLD,ANDREW.PURDY@sakilacustomer.org,535,ANDREW,,2452,1,PURDY,,181,,8985,R,2006,,,6,,4,0.99,21.99,,,,2,LOVE SUICIDES,,2024-05-10T20:32:25.129Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,28 MySQL Boulevard,2,6.99,,222,QLD,DELORES.HANSEN@sakilacustomer.org,450,DELORES,,2079,1,HANSEN,,84,,6002,NC-17,2006,,,5,,5,2.99,29.99,,,,2,IDOLS SNATCHERS,,2024-05-10T20:32:25.129Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,47 MySakila Drive,1,0.99,,549,Alberta,NELSON.CHRISTENSON@sakilacustomer.org,613,NELSON,,2792,1,CHRISTENSON,,92,,14724,NC-17,2006,,,5,,6,0.99,19.99,,,,1,MYSTIC TRUMAN,,2024-05-10T20:32:25.129Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,47 MySakila Drive,1,1.99,,269,Alberta,CASSANDRA.WALTERS@sakilacustomer.org,870,CASSANDRA,,3995,1,WALTERS,,123,,7272,PG-13,2006,,,4,,7,0.99,12.99,,,,1,SWARM GOLD,,2024-05-10T20:32:25.129Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,28 MySQL Boulevard,2,4.99,,239,QLD,MINNIE.ROMERO@sakilacustomer.org,510,MINNIE,,2346,1,ROMERO,,181,,6439,G,2006,,,6,,8,4.99,29.99,,,,2,LAWLESS VISION,,2024-05-10T20:32:25.129Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,47 MySakila Drive,1,4.99,,126,Alberta,ELLEN.SIMPSON@sakilacustomer.org,565,ELLEN,,2580,1,SIMPSON,,56,,3385,PG-13,2006,,,6,,9,4.99,9.99,,,,1,MATRIX SNOWMAN,,2024-05-10T20:32:25.129Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,47 MySakila Drive,1,5.99,,399,Alberta,DANNY.ISOM@sakilacustomer.org,396,DANNY,,1824,1,ISOM,,62,,10783,G,2006,,,5,,10,4.99,18.99,,,,1,HANGING DEEP,,2024-05-10T20:32:25.129Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7efc1c6eb730>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

active,address,address_key,amount,customer_id,customer_key,district,email,film_key,first_name,inventory_id,inventory_key,language_id,last_name,last_update,length,payment_date_key,payment_key,rating,release_year,rental_date,rental_date_key,rental_duration,rental_id,rental_key,rental_rate,replacement_cost,return_date,return_date_key,staff_id,store_key,title,_rescued_data,receipt_time,source_file
1,47 MySakila Drive,1,2.99,,130,Alberta,CHARLOTTE.HUNTER@sakilacustomer.org,80,CHARLOTTE,,367,1,HUNTER,,148,,3503,G,2006,,,7,,1,2.99,21.99,,,,1,BLANKET BEVERLY,,2024-05-10T20:34:54.629Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,47 MySakila Drive,1,2.99,,459,Alberta,TOMMY.COLLAZO@sakilacustomer.org,333,TOMMY,,1525,1,COLLAZO,,126,,12374,R,2006,,,7,,2,2.99,16.99,,,,1,FREAKY POCUS,,2024-05-10T20:34:54.629Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,47 MySakila Drive,1,3.99,,408,Alberta,MANUEL.MURRELL@sakilacustomer.org,373,MANUEL,,1711,1,MURRELL,,156,,11029,G,2006,,,7,,3,2.99,14.99,,,,1,GRADUATE LORD,,2024-05-10T20:34:54.629Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,28 MySQL Boulevard,2,4.99,,333,QLD,ANDREW.PURDY@sakilacustomer.org,535,ANDREW,,2452,1,PURDY,,181,,8985,R,2006,,,6,,4,0.99,21.99,,,,2,LOVE SUICIDES,,2024-05-10T20:34:54.629Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,28 MySQL Boulevard,2,6.99,,222,QLD,DELORES.HANSEN@sakilacustomer.org,450,DELORES,,2079,1,HANSEN,,84,,6002,NC-17,2006,,,5,,5,2.99,29.99,,,,2,IDOLS SNATCHERS,,2024-05-10T20:34:54.629Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,47 MySakila Drive,1,0.99,,549,Alberta,NELSON.CHRISTENSON@sakilacustomer.org,613,NELSON,,2792,1,CHRISTENSON,,92,,14724,NC-17,2006,,,5,,6,0.99,19.99,,,,1,MYSTIC TRUMAN,,2024-05-10T20:34:54.629Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,47 MySakila Drive,1,1.99,,269,Alberta,CASSANDRA.WALTERS@sakilacustomer.org,870,CASSANDRA,,3995,1,WALTERS,,123,,7272,PG-13,2006,,,4,,7,0.99,12.99,,,,1,SWARM GOLD,,2024-05-10T20:34:54.629Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,28 MySQL Boulevard,2,4.99,,239,QLD,MINNIE.ROMERO@sakilacustomer.org,510,MINNIE,,2346,1,ROMERO,,181,,6439,G,2006,,,6,,8,4.99,29.99,,,,2,LAWLESS VISION,,2024-05-10T20:34:54.629Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,47 MySakila Drive,1,4.99,,126,Alberta,ELLEN.SIMPSON@sakilacustomer.org,565,ELLEN,,2580,1,SIMPSON,,56,,3385,PG-13,2006,,,6,,9,4.99,9.99,,,,1,MATRIX SNOWMAN,,2024-05-10T20:34:54.629Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json
1,47 MySakila Drive,1,5.99,,399,Alberta,DANNY.ISOM@sakilacustomer.org,396,DANNY,,1824,1,ISOM,,62,,10783,G,2006,,,5,,10,4.99,18.99,,,,1,HANGING DEEP,,2024-05-10T20:34:54.629Z,dbfs:/FileStore/project%20data/stream/rental/fact_rental.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
active,bigint,
address,string,
address_key,bigint,
amount,double,
customer_id,bigint,
customer_key,bigint,
district,string,
email,string,
film_key,bigint,
first_name,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT c.store_id as store_id,
      s.store_key as store_key,
      i.inventory_id as inventory_id,
      s.inventory_key as inventory_key,
      c.customer_id as customer_id,
      s.customer_key as customer_key,
      c.first_name AS customer_first_name,
      c.last_name AS customer_last_name,
      c.email as customer_email,
      c.active as active,
      c.address_id as store_address_id,
      s.address_key as store_address_key,
      s.address as store_address,
      s.district as store_district,
      r.staff_id as staff_id,
      i.film_id as film_id,
      s.film_key as film_key,
      s.title as film_title,
      s.language_id as film_language_id,
      s.release_year as film_release_year,
      s.rating as film_rating,
      s.length as film_length,
      r.rental_id as rental_id,
      s.rental_key as rental_key,
      s.rental_date_key as rental_date_key,
      r.rental_date as rental_date,
      s.return_date_key as return_date_key,
      r.return_date as return_date,
      s.rental_rate as rental_rate,
      s.rental_duration as rental_duration,
      s.amount as rent_amount,
      s.replacement_cost as replacement_cost,
      s.payment_key as payment_key,
      s.payment_date_key as payment_date_key
  FROM rentals_silver_tempview AS s
  INNER JOIN sakila_dlh.dim_customer AS c
  ON s.customer_id = c.customer_id
  INNER JOIN sakila_dlh.dim_inventory AS i
  ON s.inventory_id = i.inventory_id
  INNER JOIN sakila_dlh.dim_rentals AS r
  ON s.rental_id = r.rental_id
)

In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7efc1c46f340>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

store_id,store_key,inventory_id,inventory_key,customer_id,customer_key,customer_first_name,customer_last_name,customer_email,active,store_address_id,store_address_key,store_address,store_district,staff_id,film_id,film_key,film_title,film_language_id,film_release_year,film_rating,film_length,rental_id,rental_key,rental_date_key,rental_date,return_date_key,return_date,rental_rate,rental_duration,rent_amount,replacement_cost,payment_key,payment_date_key
1,,367,,130,,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,1,134,,,,1,80,,,,,,,1,,,2005-05-24T22:53:30Z,,2005-05-26T22:04:30Z,,,,,,
1,,389,,316,,STEVEN,CURLEY,STEVEN.CURLEY@sakilacustomer.org,1,321,,,,2,86,,,,,,,16,,,2005-05-25T00:43:11Z,,2005-05-26T04:42:11Z,,,,,,
2,,830,,575,,ISAAC,OGLESBY,ISAAC.OGLESBY@sakilacustomer.org,1,581,,,,1,181,,,,,,,17,,,2005-05-25T01:06:36Z,,2005-05-27T00:43:36Z,,,,,,
2,,146,,388,,CRAIG,MORRELL,CRAIG.MORRELL@sakilacustomer.org,1,393,,,,2,31,,,,,,,21,,,2005-05-25T01:59:46Z,,2005-05-26T01:01:46Z,,,,,,
1,,727,,509,,RAUL,FORTIER,RAUL.FORTIER@sakilacustomer.org,1,514,,,,2,159,,,,,,,22,,,2005-05-25T02:19:23Z,,2005-05-26T04:52:23Z,,,,,,
1,,611,,44,,MARIE,TURNER,MARIE.TURNER@sakilacustomer.org,1,48,,,,2,132,,,,,,,29,,,2005-05-25T03:47:12Z,,2005-05-30T00:31:12Z,,,,,,
1,,403,,535,,JAVIER,ELROD,JAVIER.ELROD@sakilacustomer.org,1,541,,,,1,89,,,,,,,37,,,2005-05-25T04:44:31Z,,2005-05-29T01:03:31Z,,,,,,
1,,380,,523,,HARVEY,GUAJARDO,HARVEY.GUAJARDO@sakilacustomer.org,1,529,,,,2,84,,,,,,,42,,,2005-05-25T05:24:58Z,,2005-05-31T02:47:58Z,,,,,,
1,,330,,470,,GORDON,ALLARD,GORDON.ALLARD@sakilacustomer.org,1,475,,,,1,73,,,,,,,60,,,2005-05-25T08:58:25Z,,2005-05-30T14:14:25Z,,,,,,
1,,261,,419,,CHAD,CARBONE,CHAD.CARBONE@sakilacustomer.org,1,424,,,,1,58,,,,,,,62,,,2005-05-25T09:18:52Z,,2005-05-30T10:55:52Z,,,,,,


Databricks data profile. Run in Databricks to view.

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

col_name,data_type,comment
store_id,int,
store_key,bigint,
inventory_id,int,
inventory_key,bigint,
customer_id,int,
customer_key,bigint,
customer_first_name,string,
customer_last_name,string,
customer_email,string,
active,int,


##### 6.3. Gold Table: Perform Aggregations
Created a new Gold table using the CTAS approach. The table includes the number of rentals per customer and the date of rental. It also includes the customer's first and last names and inventory ID.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.rental_dates_by_customer AS (
  SELECT 
      customer_id,
      customer_first_name,
      customer_last_name,
      inventory_id,
      COUNT(rental_id) AS RentalCount,
      MIN(rental_date) AS FirstRentalDate,
      MAX(rental_date) AS LastRentalDate
  FROM 
      sakila_dlh.fact_rentals_silver
  GROUP BY 
      customer_id, customer_first_name, customer_last_name, inventory_id
  ORDER BY 
      RentalCount DESC
);

SELECT * FROM sakila_dlh.rental_dates_by_customer;



customer_id,customer_first_name,customer_last_name,inventory_id,RentalCount,FirstRentalDate,LastRentalDate
302,MICHAEL,SILVERMAN,775,1,2005-05-25T15:38:46Z,2005-05-25T15:38:46Z
89,JULIA,FLORES,600,1,2005-05-29T08:30:36Z,2005-05-29T08:30:36Z
298,ERIKA,PENA,955,1,2005-05-27T10:12:20Z,2005-05-27T10:12:20Z
49,JOYCE,EDWARDS,938,1,2005-05-30T01:17:45Z,2005-05-30T01:17:45Z
105,DAWN,SULLIVAN,802,1,2005-05-27T21:36:34Z,2005-05-27T21:36:34Z
381,BOBBY,BOUDREAU,299,1,2005-05-29T23:37:00Z,2005-05-29T23:37:00Z
359,WILLIE,MARKHAM,350,1,2005-05-26T19:21:44Z,2005-05-26T19:21:44Z
69,JUDY,GRAY,515,1,2005-05-29T11:38:34Z,2005-05-29T11:38:34Z
500,REGINALD,KINDER,691,1,2005-05-27T10:45:41Z,2005-05-27T10:45:41Z
36,KATHLEEN,ADAMS,85,1,2005-05-29T04:35:29Z,2005-05-29T04:35:29Z


#### 9.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/project data