In [0]:
# Section I: Prerequisites
# 1.0. Import Required Libraries
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "hy6ppm-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw2"

connection_properties = {
  "user" : "yanghx",
  "password" : "Thereis_123!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Cluster0.km3agjq"
atlas_database_name = "sakila_dw2"
atlas_user_name = "hy6ppm"
atlas_password = "Yanghx0917"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/ds2002-lab06"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/finalproject_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rental_stream_dir = f"{stream_dir}/orders"

rental_output_bronze = f"{database_dir}/fact_rental/bronze"
rental_output_silver = f"{database_dir}/fact_rental/silver"
rental_trans_output_gold   = f"{database_dir}/fact_rental/gold"

# Delete the Streaming Files ################################## 
#dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
#dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
#dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
#dbutils.fs.rm(database_dir, True)

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Database"
LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://hy6ppm-mysql.mysql.database.azure.com:3306/sakila_dw2",
  dbtable "dim_date",
  user "yanghx",
  password "Thereis_123!"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20010101,2001-01-01,2001/01/01,01/01/2001,01/01/2001,2,Monday,1,1,Weekday,1,January,1,N,1,2001,2001-01,2001Q1,7,3,2001,2001-07,2001Q3
20010102,2001-01-02,2001/01/02,01/02/2001,02/01/2001,3,Tuesday,2,2,Weekday,1,January,1,N,1,2001,2001-01,2001Q1,7,3,2001,2001-07,2001Q3
20010103,2001-01-03,2001/01/03,01/03/2001,03/01/2001,4,Wednesday,3,3,Weekday,1,January,1,N,1,2001,2001-01,2001Q1,7,3,2001,2001-07,2001Q3
20010104,2001-01-04,2001/01/04,01/04/2001,04/01/2001,5,Thursday,4,4,Weekday,1,January,1,N,1,2001,2001-01,2001Q1,7,3,2001,2001-07,2001Q3
20010105,2001-01-05,2001/01/05,01/05/2001,05/01/2001,6,Friday,5,5,Weekday,1,January,1,N,1,2001,2001-01,2001Q1,7,3,2001,2001-07,2001Q3


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_rental" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_rental
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://hy6ppm-mysql.mysql.database.azure.com:3306/sakila_dw2",
  dbtable "dim_rental",
  user "yanghx",
  password "Thereis_123!"
)


In [0]:
%sql
USE DATABASE sakila_dlh;
CREATE OR REPLACE TABLE sakila_dlh.dim_rental
COMMENT "Rental Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh/dim_rental"
AS SELECT * FROM view_rental
-- Create a new table named "sakila_dlh.dim_rental" using data from the view named "view_rental"

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_rental;

col_name,data_type,comment
rental_key,int,
rental_date,timestamp,
inventory_id,bigint,
customer_id,int,
return_date,timestamp,
staff_id,int,
last_update,timestamp,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_rental LIMIT 5

rental_key,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
1,2005-05-24T22:53:30.000+0000,367,130,2005-05-26T22:04:30.000+0000,1,2006-02-15T21:30:53.000+0000
2,2005-05-24T22:54:33.000+0000,1525,459,2005-05-28T19:40:33.000+0000,1,2006-02-15T21:30:53.000+0000
3,2005-05-24T23:03:39.000+0000,1711,408,2005-06-01T22:12:39.000+0000,1,2006-02-15T21:30:53.000+0000
4,2005-05-24T23:04:41.000+0000,2452,333,2005-06-03T01:43:41.000+0000,2,2006-02-15T21:30:53.000+0000
5,2005-05-24T23:05:21.000+0000,2079,222,2005-06-02T04:33:21.000+0000,1,2006-02-15T21:30:53.000+0000


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-lab06/finalproject_data/batch/sakila_customer.csv,sakila_customer.csv,11857,1683913512000
dbfs:/FileStore/ds2002-lab06/finalproject_data/batch/sakila_customer.json,sakila_customer.json,71698,1683913512000
dbfs:/FileStore/ds2002-lab06/finalproject_data/batch/sakila_fact_rental.json,sakila_fact_rental.json,92412,1683913512000
dbfs:/FileStore/ds2002-lab06/finalproject_data/batch/sakila_film.csv,sakila_film.csv,203420,1683913512000
dbfs:/FileStore/ds2002-lab06/finalproject_data/batch/sakila_film.json,sakila_film.json,462531,1683913511000
dbfs:/FileStore/ds2002-lab06/finalproject_data/batch/sakila_payment.csv,sakila_payment.csv,36243,1683913512000
dbfs:/FileStore/ds2002-lab06/finalproject_data/batch/sakila_payment.json,sakila_payment.json,191953,1683913511000
dbfs:/FileStore/ds2002-lab06/finalproject_data/batch/sakila_rental.csv,sakila_rental.csv,58544,1683913511000
dbfs:/FileStore/ds2002-lab06/finalproject_data/batch/sakila_rental.json,sakila_rental.json,178475,1683913511000
dbfs:/FileStore/ds2002-lab06/finalproject_data/batch/sakila_staff.json,sakila_staff.json,600,1683914664000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds2002-lab06/finalproject_data/batch'
json_files = {"staff" : 'sakila_staff.json', "films" : 'sakila_film.json', "payment" : 'sakila_payment.json', "customer" : 'sakila_customer.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[15]: <pymongo.results.InsertManyResult at 0x7f080c00d800>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_staff = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw2").option("collection", "staff").load()
.select("staff_key", "first_name", "last_name", "address_id", "email","store_id", "active" ,"username","password","last_update")

display(df_staff)

staff_key,first_name,last_name,address_id,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


In [0]:
%scala
df_staff.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff

col_name,data_type,comment
staff_key,int,
first_name,string,
last_name,string,
address_id,int,
email,string,
store_id,int,
active,int,
username,string,
password,string,
last_update,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5

staff_key,first_name,last_name,address_id,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


##### 2.4.1 Fetch Supplier Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw2").option("collection", "customer").load()
.select("customer_key","store_key","customer_last_name","customer_first_name")

display(df_customer)


customer_key,store_key,customer_last_name,customer_first_name
1,1,SMITH,MARY
2,1,JOHNSON,PATRICIA
3,1,WILLIAMS,LINDA
4,2,JONES,BARBARA
5,1,BROWN,ELIZABETH
6,2,DAVIS,JENNIFER
7,1,MILLER,MARIA
8,2,WILSON,SUSAN
9,2,MOORE,MARGARET
10,1,TAYLOR,DOROTHY


In [0]:
%scala
df_customer.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Suppliers Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
store_key,int,
customer_last_name,string,
customer_first_name,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_customer,
Type,MANAGED,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,store_key,customer_last_name,customer_first_name
1,1,SMITH,MARY
2,1,JOHNSON,PATRICIA
3,1,WILLIAMS,LINDA
4,2,JONES,BARBARA
5,1,BROWN,ELIZABETH


##### 2.5.1 Fetch Invoice Dimension Data from teh New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_payment = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw2").option("collection", "payment").load()
.select("payment_id","customer_id","staff_id","rental_id","amount","payment_date","last_update")

display(df_payment)

payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30
3,1,1,1185,5.99,2005-06-15 00:54:12,2006-02-15 22:12:30
4,1,2,1422,0.99,2005-06-15 18:02:53,2006-02-15 22:12:30
5,1,2,1476,9.99,2005-06-15 21:08:46,2006-02-15 22:12:30
6,1,1,1725,4.99,2005-06-16 15:18:57,2006-02-15 22:12:30
7,1,1,2308,4.99,2005-06-18 08:41:48,2006-02-15 22:12:30
8,1,2,2363,0.99,2005-06-18 13:33:59,2006-02-15 22:12:30
9,1,1,3284,3.99,2005-06-21 06:24:45,2006-02-15 22:12:30
10,1,2,4526,5.99,2005-07-08 03:17:05,2006-02-15 22:12:30


In [0]:
%scala
df_payment.printSchema()

##### 2.5.2. Use the Spark DataFrame to Create a New Invoices Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_payment")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_payment

col_name,data_type,comment
payment_id,int,
customer_id,int,
staff_id,int,
rental_id,int,
amount,double,
payment_date,string,
last_update,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_payment LIMIT 5

payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30
3,1,1,1185,5.99,2005-06-15 00:54:12,2006-02-15 22:12:30
4,1,2,1422,0.99,2005-06-15 18:02:53,2006-02-15 22:12:30
5,1,2,1476,9.99,2005-06-15 21:08:46,2006-02-15 22:12:30


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
film_csv = f"{batch_dir}/sakila_film.csv"

df_film = spark.read.format('csv').options(header='true', inferSchema='true').load(film_csv)
display(df_film)

film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15T05:03:42.000+0000
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15T05:03:42.000+0000
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15T05:03:42.000+0000
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15T05:03:42.000+0000
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15T05:03:42.000+0000
6,AGENT TRUMAN,A Intrepid Panorama of a Robot And a Boy who must Escape a Sumo Wrestler in Ancient China,2006,1,,3,2.99,169,17.99,PG,Deleted Scenes,2006-02-15T05:03:42.000+0000
7,AIRPLANE SIERRA,A Touching Saga of a Hunter And a Butler who must Discover a Butler in A Jet Boat,2006,1,,6,4.99,62,28.99,PG-13,"Trailers,Deleted Scenes",2006-02-15T05:03:42.000+0000
8,AIRPORT POLLOCK,A Epic Tale of a Moose And a Girl who must Confront a Monkey in Ancient India,2006,1,,6,4.99,54,15.99,R,Trailers,2006-02-15T05:03:42.000+0000
9,ALABAMA DEVIL,A Thoughtful Panorama of a Database Administrator And a Mad Scientist who must Outgun a Mad Scientist in A Jet Boat,2006,1,,3,2.99,114,21.99,PG-13,"Trailers,Deleted Scenes",2006-02-15T05:03:42.000+0000
10,ALADDIN CALENDAR,A Action-Packed Tale of a Man And a Lumberjack who must Reach a Feminist in Ancient China,2006,1,,6,4.99,63,24.99,NC-17,"Trailers,Deleted Scenes",2006-02-15T05:03:42.000+0000


In [0]:
df_film.printSchema()

root
 |-- film_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- language_id: integer (nullable = true)
 |-- original_language_id: string (nullable = true)
 |-- rental_duration: integer (nullable = true)
 |-- rental_rate: double (nullable = true)
 |-- length: integer (nullable = true)
 |-- replacement_cost: double (nullable = true)
 |-- rating: string (nullable = true)
 |-- special_features: string (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film;

col_name,data_type,comment
film_id,int,
title,string,
description,string,
release_year,int,
language_id,int,
original_language_id,string,
rental_duration,int,
rental_rate,double,
length,int,
replacement_cost,double,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5;

film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15T05:03:42.000+0000
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15T05:03:42.000+0000
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15T05:03:42.000+0000
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15T05:03:42.000+0000
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15T05:03:42.000+0000


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
sakila_dlh,dim_payment,False
sakila_dlh,dim_rental,False
sakila_dlh,dim_staff,False
,view_date,True
,view_rental,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "rental_key BIGINT")
 .option("cloudFiles.schemaHints", "inventory_key BIGINT")
 .option("cloudFiles.schemaHints", "staff_key BIGINT")
 .option("cloudFiles.schemaHints", "customer_key BIGINT") 
 .option("cloudFiles.schemaHints", "payment_key BIGINT")
 .option("cloudFiles.schemaHints", "payment_amount DECIMAL")
 .option("cloudFiles.schemaHints", "store_key BIGINT")
 .option("cloudFiles.schemaHints", "film_key BIGINT")
 .option("cloudFiles.schemaHints", "rental_date_key BIGINT") 
 .option("cloudFiles.schemaHints", "payment_date_key BIGINT")
 .option("cloudFiles.schemaHints", "return_date_key BIGINT")
 .option("cloudFiles.schemaHints", "customer_first_name STRING")
 .option("cloudFiles.schemaHints", "customer_last_name STRING")
 .option("cloudFiles.schemaHints", "staff_first_name STRING")
 .option("cloudFiles.schemaHints", "staff_last_name STRING")
 .option("cloudFiles.schemaLocation", rental_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rental_stream_dir)
 .createOrReplaceTempView("fact_table_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM fact_table_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

customer_first_name,customer_key,customer_last_name,film_key,inventory_key,payment_amount,payment_date_key,payment_key,rental_date_key,rental_key,return_date_key,staff_first_name,staff_key,staff_last_name,store_key,_rescued_data,receipt_time,source_file
DEBORAH,25,WALKER,115,523,5.99,20050801,676,20050801,10324,20050809,Jon,2,Stephens,1,,2023-05-12T21:11:25.175+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
ANNA,33,HILL,72,323,2.99,20050801,918,20050801,10335,20050805,Mike,1,Hillyer,2,,2023-05-12T21:11:25.175+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
MARIA,7,MILLER,162,739,5.99,20050801,196,20050801,10423,20050808,Mike,1,Hillyer,1,,2023-05-12T21:11:25.175+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
MARY,1,SMITH,3,14,4.99,20050801,22,20050801,10437,20050810,Mike,1,Hillyer,1,,2023-05-12T21:11:25.175+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
MARGARET,9,MOORE,88,397,4.99,20050801,246,20050801,10454,20050804,Jon,2,Stephens,2,,2023-05-12T21:11:25.175+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
PATRICIA,2,JOHNSON,27,138,0.99,20050801,49,20050801,10466,20050806,Mike,1,Hillyer,1,,2023-05-12T21:11:25.175+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
BETTY,14,WHITE,6,27,6.99,20050801,377,20050801,10526,20050808,Mike,1,Hillyer,2,,2023-05-12T21:11:25.175+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
ANGELA,29,HERNANDEZ,135,624,5.99,20050801,801,20050801,10543,20050807,Mike,1,Hillyer,2,,2023-05-12T21:11:25.175+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
MICHELLE,21,CLARK,81,369,4.99,20050801,561,20050801,10570,20050805,Jon,2,Stephens,1,,2023-05-12T21:11:25.175+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
CAROL,18,GARCIA,200,904,1.99,20050801,485,20050801,10721,20050809,Jon,2,Stephens,2,,2023-05-12T21:11:25.175+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

Out[31]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f080ca298b0>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rental_silver_tempview"))

In [0]:
%sql
SELECT * FROM rental_silver_tempview

customer_first_name,customer_key,customer_last_name,film_key,inventory_key,payment_amount,payment_date_key,payment_key,rental_date_key,rental_key,return_date_key,staff_first_name,staff_key,staff_last_name,store_key,_rescued_data,receipt_time,source_file
DEBORAH,25,WALKER,115,523,5.99,20050801,676,20050801,10324,20050809,Jon,2,Stephens,1,,2023-05-12T21:11:28.112+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
ANNA,33,HILL,72,323,2.99,20050801,918,20050801,10335,20050805,Mike,1,Hillyer,2,,2023-05-12T21:11:28.112+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
MARIA,7,MILLER,162,739,5.99,20050801,196,20050801,10423,20050808,Mike,1,Hillyer,1,,2023-05-12T21:11:28.112+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
MARY,1,SMITH,3,14,4.99,20050801,22,20050801,10437,20050810,Mike,1,Hillyer,1,,2023-05-12T21:11:28.112+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
MARGARET,9,MOORE,88,397,4.99,20050801,246,20050801,10454,20050804,Jon,2,Stephens,2,,2023-05-12T21:11:28.112+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
PATRICIA,2,JOHNSON,27,138,0.99,20050801,49,20050801,10466,20050806,Mike,1,Hillyer,1,,2023-05-12T21:11:28.112+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
BETTY,14,WHITE,6,27,6.99,20050801,377,20050801,10526,20050808,Mike,1,Hillyer,2,,2023-05-12T21:11:28.112+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
ANGELA,29,HERNANDEZ,135,624,5.99,20050801,801,20050801,10543,20050807,Mike,1,Hillyer,2,,2023-05-12T21:11:28.112+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
MICHELLE,21,CLARK,81,369,4.99,20050801,561,20050801,10570,20050805,Jon,2,Stephens,1,,2023-05-12T21:11:28.112+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json
CAROL,18,GARCIA,200,904,1.99,20050801,485,20050801,10721,20050809,Jon,2,Stephens,2,,2023-05-12T21:11:28.112+0000,dbfs:/FileStore/ds2002-lab06/finalproject_data/stream/orders/sakila_factrental_03.json


In [0]:
%sql
DESCRIBE EXTENDED rental_silver_tempview

col_name,data_type,comment
customer_first_name,string,
customer_key,bigint,
customer_last_name,string,
film_key,bigint,
inventory_key,bigint,
payment_amount,double,
payment_date_key,bigint,
payment_key,bigint,
rental_date_key,bigint,
rental_key,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rental_silver_tempview AS (
  SELECT 
      r.rental_key,
      r.staff_key,
      s.last_name AS staff_last_name,
      s.first_name AS staff_first_name,
      r.customer_key,
      c.customer_last_name AS customer_last_name,
      c.customer_first_name AS customer_first_name,
      r.film_key,
      f.title AS film_title,
      f.release_year AS film_release_year,
      f.rental_duration AS film_rental_duration,
      f.rental_rate AS film_rental_rate,
      f.replacement_cost AS film_replacement_cost,
      r.rental_date_key,
      rental_d.day_name_of_week AS rental_day_name_of_week,
      rental_d.day_of_month AS rental_day_of_month,
      rental_d.weekday_weekend AS rental_weekday_weekend,
      rental_d.month_name AS rental_month_name,
      rental_d.calendar_quarter AS rental_quarter,
      rental_d.calendar_year AS rental_year,
      r.return_date_key,
      return_d.day_name_of_week AS return_day_name_of_week,
      return_d.day_of_month AS return_day_of_month,
      return_d.weekday_weekend AS return_weekday_weekend,
      return_d.month_name AS return_month_name,
      return_d.calendar_quarter AS return_calendar_quarter,
      return_d.calendar_year AS return_calendar_year
  FROM rental_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_staff AS s
  ON s.staff_key = r.staff_key
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_key = r.customer_key
  INNER JOIN sakila_dlh.dim_film AS f
  ON f.film_id = r.film_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS return_d
  ON return_d.date_key = r.return_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS rental_d
  ON rental_d.date_key = r.rental_date_key
)

In [0]:
(spark.table("fact_rental_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

Out[36]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f080c1b0130>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

rental_key,staff_key,staff_last_name,staff_first_name,customer_key,customer_last_name,customer_first_name,film_key,film_title,film_release_year,film_rental_duration,film_rental_rate,film_replacement_cost,rental_date_key,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_quarter,rental_year,return_date_key,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_calendar_quarter,return_calendar_year


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

col_name,data_type,comment
rental_key,bigint,
staff_key,bigint,
staff_last_name,string,
staff_first_name,string,
customer_key,bigint,
customer_last_name,string,
customer_first_name,string,
film_key,bigint,
film_title,string,
film_release_year,int,


##### 6.3. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT customer_key AS CustomerID
  , customer_last_name AS LastName
  , customer_first_name AS FirstName
  , rental_month_name AS RentalMonth
  , COUNT(rental_key) AS RentalCount
FROM sakila_dlh.fact_rentals_silver
GROUP BY CustomerID, LastName, FirstName, RentalMonth
ORDER BY RentalCount DESC


CustomerID,LastName,FirstName,RentalMonth,RentalCount
5,BROWN,ELIZABETH,July,7
25,WALKER,DEBORAH,July,6
15,HARRIS,HELEN,August,5
26,HALL,JESSICA,July,5
34,SCOTT,REBECCA,July,5
13,JACKSON,KAREN,August,4
25,WALKER,DEBORAH,August,4
21,CLARK,MICHELLE,August,4
32,LOPEZ,AMY,July,4
11,ANDERSON,LISA,July,4


In [0]:
%sql
SELECT rc.CustomerID
  , r.customer_last_name AS CustomerName
  , r.rental_key AS RentalID
  , rc.RentalCount
FROM sakila_dlh.fact_rentals_silver AS r
INNER JOIN (
  SELECT customer_key AS CustomerID
  , COUNT(rental_key) AS RentalCount
  FROM sakila_dlh.fact_rentals_silver
  GROUP BY customer_key
) AS rc
ON rc.CustomerID = r.customer_key
ORDER BY RentalCount DESC

CustomerID,CustomerName,RentalID,RentalCount
25,WALKER,1033,12
25,WALKER,10324,12
25,WALKER,5881,12
25,WALKER,12922,12
25,WALKER,2901,12
25,WALKER,14193,12
25,WALKER,6653,12
25,WALKER,14236,12
25,WALKER,4404,12
25,WALKER,6905,12


#### 9.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/ds2002-lab06/