In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "hfx6mh-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila"

connection_properties = {
  "user" : "kliu",
  "password" : "password1!",
  "driver" : "org.mariadb.jdbc.Driver"
}


# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "midtermproject.jmyqn"
atlas_database_name = "sakila2"
atlas_user_name = "kimmy"
atlas_password = "pwd1"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/final_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/sakila"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rental_stream_dir = f"{stream_dir}/rentals"

rental_output_bronze = f"{database_dir}/fact_rentals/bronze"
rental_output_silver = f"{database_dir}/fact_rentals/silver"
rental_output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.


In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Sakila Database"
LOCATION "dbfs:/FileStore/final_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Sakila Database");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://hfx6mh-mysql.mysql.database.azure.com:3306/sakila2", --Replace with your Server Name
  dbtable "dim_date",
  user "kliu",    --Replace with your User Name
  password "password1!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Dimension Data from an Azure MySQL database.

In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://hfx6mh-mysql.mysql.database.azure.com:3306/sakila2", --Replace with your Server Name
  dbtable "dim_customer",
  user "kliu",    --Replace with your User Name
  password "password1!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_customer
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/final_data/sakila_dlh/dim_customer"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_key,bigint,
customer_id,bigint,
store_id,bigint,
first_name,varchar(65535),
last_name,varchar(65535),
email,varchar(65535),
address_id,bigint,
active,bigint,
create_date,varchar(65535),
last_update,varchar(65535),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
5,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/final_data/sakila/batch/dim_customer.csv,dim_customer.csv,63726,1733444458000
dbfs:/FileStore/final_data/sakila/batch/dim_date.json,dim_date.json,663793,1733444458000
dbfs:/FileStore/final_data/sakila/batch/dim_film.csv,dim_film.csv,212232,1733444458000
dbfs:/FileStore/final_data/sakila/batch/dim_inventory.json,dim_inventory.json,613878,1733444458000
dbfs:/FileStore/final_data/sakila/batch/dim_rental.json,dim_rental.json,235368,1733444459000
dbfs:/FileStore/final_data/sakila/batch/dim_store.json,dim_store.json,263,1733444458000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/final_data/sakila/batch'
json_files = {"inventory" : 'dim_inventory.json'
              , "rental" : 'dim_rental.json'
              , "store" : 'dim_store.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f48b4ee9000>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "kimmy"
val pwd = "pwd1"
val clusterName = "midtermproject.jmyqn"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_inventory = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila2")
.option("collection", "inventory").load()
.select("inventory_key","inventory_id","film_id","store_id","last_update_x","film_key","description","release_year","language_id","rental_duration","rental_rate","length","replacement_cost","rating","special_features","last_update_y", "category_id","category_name")

display(df_inventory)

inventory_key,inventory_id,film_id,store_id,last_update_x,film_key,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update_y,category_id,category_name
1,1,1,1,2006-02-15 05:09:17,1,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
2,2,1,1,2006-02-15 05:09:17,1,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
3,3,1,1,2006-02-15 05:09:17,1,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
4,4,1,1,2006-02-15 05:09:17,1,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
5,5,1,2,2006-02-15 05:09:17,1,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
6,6,1,2,2006-02-15 05:09:17,1,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
7,7,1,2,2006-02-15 05:09:17,1,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
8,8,1,2,2006-02-15 05:09:17,1,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
9,9,2,2,2006-02-15 05:09:17,2,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42,11,Horror
10,10,2,2,2006-02-15 05:09:17,2,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42,11,Horror


In [0]:
%scala
df_inventory.printSchema()

In [0]:
%scala
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory

col_name,data_type,comment
inventory_key,int,
inventory_id,int,
film_id,int,
store_id,int,
last_update_x,string,
film_key,int,
description,string,
release_year,int,
language_id,int,
rental_duration,int,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

inventory_key,inventory_id,film_id,store_id,last_update_x,film_key,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update_y,category_id,category_name
1,1,1,1,2006-02-15 05:09:17,1,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
2,2,1,1,2006-02-15 05:09:17,1,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
3,3,1,1,2006-02-15 05:09:17,1,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
4,4,1,1,2006-02-15 05:09:17,1,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary
5,5,1,2,2006-02-15 05:09:17,1,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,6,Documentary


##### 2.4.1 Fetch Supplier Dimension Data from the New MongoDB Collection

In [0]:
%scala

import com.mongodb.spark._

val df_rental = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila2")
.option("collection", "rental")
.option("uri", atlas_uri).load()
.select("rental_key", "rental_id", "rental_date", "inventory_id", "customer_id","staff_id","last_update","rental_return_date_key")

display(df_rental)

rental_key,rental_id,rental_date,inventory_id,customer_id,staff_id,last_update,rental_return_date_key
1,1,2005-05-24 22:53:30,367,130,1,2006-02-15 21:30:53,20050526
2,2,2005-05-24 22:54:33,1525,459,1,2006-02-15 21:30:53,20050528
3,3,2005-05-24 23:03:39,1711,408,1,2006-02-15 21:30:53,20050601
4,4,2005-05-24 23:04:41,2452,333,2,2006-02-15 21:30:53,20050603
5,5,2005-05-24 23:05:21,2079,222,1,2006-02-15 21:30:53,20050602
6,6,2005-05-24 23:08:07,2792,549,1,2006-02-15 21:30:53,20050527
7,7,2005-05-24 23:11:53,3995,269,2,2006-02-15 21:30:53,20050529
8,8,2005-05-24 23:31:46,2346,239,2,2006-02-15 21:30:53,20050527
9,9,2005-05-25 00:00:40,2580,126,1,2006-02-15 21:30:53,20050528
10,10,2005-05-25 00:02:21,1824,399,2,2006-02-15 21:30:53,20050531


In [0]:
%scala
df_rental.printSchema()

In [0]:
%scala
df_rental.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_rental")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_rental

col_name,data_type,comment
rental_key,int,
rental_id,int,
rental_date,string,
inventory_id,int,
customer_id,int,
staff_id,int,
last_update,string,
rental_return_date_key,int,
,,
# Delta Statistics Columns,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_rental LIMIT 5

rental_key,rental_id,rental_date,inventory_id,customer_id,staff_id,last_update,rental_return_date_key
1,1,2005-05-24 22:53:30,367,130,1,2006-02-15 21:30:53,20050526
2,2,2005-05-24 22:54:33,1525,459,1,2006-02-15 21:30:53,20050528
3,3,2005-05-24 23:03:39,1711,408,1,2006-02-15 21:30:53,20050601
4,4,2005-05-24 23:04:41,2452,333,2,2006-02-15 21:30:53,20050603
5,5,2005-05-24 23:05:21,2079,222,1,2006-02-15 21:30:53,20050602


##### 2.5.1 Fetch Invoice Dimension Data from teh New MongoDB Collection

In [0]:
%scala

import com.mongodb.spark._

val df_store = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila2")
.option("collection", "store")
.option("uri", atlas_uri).load()
.select("store_key", "store_id", "manager_staff_id", "address_id", "last_update")

display(df_store)

store_key,store_id,manager_staff_id,address_id,last_update
1,1,1,1,2006-02-15 04:57:12
2,2,2,2,2006-02-15 04:57:12


In [0]:
%scala
df_store.printSchema()

#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
customer_csv = f"{batch_dir}/dim_customer.csv"

df_customer = spark.read.format('csv').options(header='true', inferSchema='true').load(customer_csv)
display(df_customer)

customer_key,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
3,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
4,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
5,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
6,6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
7,7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
8,8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
9,9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
10,10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z


In [0]:
df_customer.printSchema()

root
 |-- customer_key: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
film_csv = f"{batch_dir}/dim_film.csv"

df_film = spark.read.format('csv').options(header='true', inferSchema='true').load(film_csv)
display(df_film)

film_key,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update,category_id,category_name,film_id
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15T05:03:42Z,6,Documentary,1
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15T05:03:42Z,11,Horror,2
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15T05:03:42Z,6,Documentary,3
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15T05:03:42Z,11,Horror,4
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15T05:03:42Z,8,Family,5
6,AGENT TRUMAN,A Intrepid Panorama of a Robot And a Boy who must Escape a Sumo Wrestler in Ancient China,2006,1,3,2.99,169,17.99,PG,Deleted Scenes,2006-02-15T05:03:42Z,9,Foreign,6
7,AIRPLANE SIERRA,A Touching Saga of a Hunter And a Butler who must Discover a Butler in A Jet Boat,2006,1,6,4.99,62,28.99,PG-13,"Trailers,Deleted Scenes",2006-02-15T05:03:42Z,5,Comedy,7
8,AIRPORT POLLOCK,A Epic Tale of a Moose And a Girl who must Confront a Monkey in Ancient India,2006,1,6,4.99,54,15.99,R,Trailers,2006-02-15T05:03:42Z,11,Horror,8
9,ALABAMA DEVIL,A Thoughtful Panorama of a Database Administrator And a Mad Scientist who must Outgun a Mad Scientist in A Jet Boat,2006,1,3,2.99,114,21.99,PG-13,"Trailers,Deleted Scenes",2006-02-15T05:03:42Z,11,Horror,9
10,ALADDIN CALENDAR,A Action-Packed Tale of a Man And a Lumberjack who must Reach a Feminist in Ancient China,2006,1,6,4.99,63,24.99,NC-17,"Trailers,Deleted Scenes",2006-02-15T05:03:42Z,15,Sports,10


In [0]:
df_film.printSchema()

root
 |-- film_key: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- language_id: integer (nullable = true)
 |-- rental_duration: integer (nullable = true)
 |-- rental_rate: double (nullable = true)
 |-- length: integer (nullable = true)
 |-- replacement_cost: double (nullable = true)
 |-- rating: string (nullable = true)
 |-- special_features: string (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- film_id: integer (nullable = true)



In [0]:
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film;

col_name,data_type,comment
film_key,int,
title,string,
description,string,
release_year,int,
language_id,int,
rental_duration,int,
rental_rate,double,
length,int,
replacement_cost,double,
rating,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5;

film_key,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update,category_id,category_name,film_id
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15T05:03:42Z,6,Documentary,1
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15T05:03:42Z,11,Horror,2
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15T05:03:42Z,6,Documentary,3
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15T05:03:42Z,11,Horror,4
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15T05:03:42Z,8,Family,5


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_rental,False
,_sqldf,True
,display_query_1,True
,display_query_10,True
,display_query_11,True
,display_query_12,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles") #autoloads
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 #.option("cloudFiles.schemaHints", "order_key BIGINT")
 #.option("cloudFiles.schemaHints", "employee_key BIGINT")
 #.option("cloudFiles.schemaHints", "customer_key BIGINT") 
 #.option("cloudFiles.schemaHints", "product_key BIGINT")
 #.option("cloudFiles.schemaHints", "shipper_key DECIMAL")
 #.option("cloudFiles.schemaHints", "order_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "paid_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 #.option("cloudFiles.schemaHints", "quantity DECIMAL")
 #.option("cloudFiles.schemaHints", "unit_price DECIMAL")
 #.option("cloudFiles.schemaHints", "discount DECIMAL")
 #.option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 #.option("cloudFiles.schemaHints", "taxes DECIMAL")
 #.option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 #.option("cloudFiles.schemaHints", "payment_type STRING")
 #.option("cloudFiles.schemaHints", "order_status STRING")
 #.option("cloudFiles.schemaHints", "order_details_status STRING")
 .option("cloudFiles.schemaLocation", rental_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rental_stream_dir)
 .createOrReplaceTempView("rental_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rental_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

customer_id,fact_rental_key,film_id,inventory_key,last_update_key,rental_date_key,rental_id,return_date_key,staff_id,_rescued_data,receipt_time,source_file
596,141,174,797,20060215,20050528,667,20050531,1,,2024-12-06T00:22:37.382Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
201,142,51,227,20060215,20050528,670,20050606,2,,2024-12-06T00:22:37.382Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
306,143,152,697,20060215,20050528,672,20050606,2,,2024-12-06T00:22:37.382Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
526,144,201,907,20060215,20050528,679,20050606,2,,2024-12-06T00:22:37.382Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
571,145,162,742,20060215,20050529,689,20050603,2,,2024-12-06T00:22:37.382Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
18,146,174,800,20060215,20050529,692,20050602,2,,2024-12-06T00:22:37.382Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
190,147,138,635,20060215,20050529,693,20050603,2,,2024-12-06T00:22:37.382Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
399,148,129,592,20060215,20050529,694,20050605,1,,2024-12-06T00:22:37.382Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
246,149,26,130,20060215,20050529,699,20050604,2,,2024-12-06T00:22:37.382Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
117,150,111,500,20060215,20050529,700,20050530,1,,2024-12-06T00:22:37.382Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f48b44b4b50>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

customer_id,fact_rental_key,film_id,inventory_key,last_update_key,rental_date_key,rental_id,return_date_key,staff_id,_rescued_data,receipt_time,source_file
596,141,174,797,20060215,20050528,667,20050531,1,,2024-12-06T00:22:38.822Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
201,142,51,227,20060215,20050528,670,20050606,2,,2024-12-06T00:22:38.822Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
306,143,152,697,20060215,20050528,672,20050606,2,,2024-12-06T00:22:38.822Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
526,144,201,907,20060215,20050528,679,20050606,2,,2024-12-06T00:22:38.822Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
571,145,162,742,20060215,20050529,689,20050603,2,,2024-12-06T00:22:38.822Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
18,146,174,800,20060215,20050529,692,20050602,2,,2024-12-06T00:22:38.822Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
190,147,138,635,20060215,20050529,693,20050603,2,,2024-12-06T00:22:38.822Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
399,148,129,592,20060215,20050529,694,20050605,1,,2024-12-06T00:22:38.822Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
246,149,26,130,20060215,20050529,699,20050604,2,,2024-12-06T00:22:38.822Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json
117,150,111,500,20060215,20050529,700,20050530,1,,2024-12-06T00:22:38.822Z,dbfs:/FileStore/final_data/sakila/stream/rentals/fact_rental03.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
customer_id,bigint,
fact_rental_key,bigint,
film_id,bigint,
inventory_key,bigint,
last_update_key,bigint,
rental_date_key,bigint,
rental_id,bigint,
return_date_key,bigint,
staff_id,bigint,
_rescued_data,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT         -- Join with dim_date for return date
    f.fact_rental_key,
    f.customer_id,
    c.first_name AS customer_first_name,  
    c.last_name AS customer_last_name,
    c.email AS customer_email,
    f.film_id,
    m.title AS film_title,  
    m.description AS film_description,
    m.release_year AS film_release_year,
    m.language_id AS film_langauge_id,
    m.rental_duration AS film_rental_duration,
    m.length AS film_length,
    m.replacement_cost AS film_replacement_cost,
    m.rating AS film_rating,
    m.special_features AS film_special_features,
    m.category_name AS film_category_name,       
    m.category_id AS film_category,            
    f.inventory_key,
    i.special_features AS inventory_special_features,
    i.replacement_cost AS inventory_replacement_cost,
    i.rental_rate AS inventory_rental_rate,
    i.rental_duration AS inventory_rental_duration,
    f.return_date_key,
    f.rental_id,
    r.rental_date
  FROM rentals_silver_tempview AS f
  INNER JOIN sakila_dlh.dim_customer AS c
  ON f.customer_id = c.customer_key
  INNER JOIN sakila_dlh.dim_film AS m
  ON f.film_id = m.film_key
  INNER JOIN sakila_dlh.dim_inventory AS i 
  ON f.inventory_key = i.inventory_key
  INNER JOIN sakila_dlh.dim_date AS d 
  ON f.return_date_key = d.date_key
  INNER JOIN sakila_dlh.dim_rental as r
  ON f.rental_id = r.rental_id

);

In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f48c5bc4550>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

fact_rental_key,customer_id,customer_first_name,customer_last_name,customer_email,film_id,film_title,film_description,film_release_year,film_langauge_id,film_rental_duration,film_length,film_replacement_cost,film_rating,film_special_features,film_category_name,film_category,inventory_key,inventory_special_features,inventory_replacement_cost,inventory_rental_rate,inventory_rental_duration,return_date_key,rental_id,rental_date
119,6,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,83,BLUES INSTINCT,A Insightful Documentary of a Boat And a Composer who must Meet a Forensic Psychologist in An Abandoned Fun House,2006,1,5,50,18.99,G,"Trailers,Deleted Scenes,Behind the Scenes",Family,8,375,"Trailers,Deleted Scenes,Behind the Scenes",18.99,2.99,5,20050601,577,2005-05-28 11:09:14
190,9,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,196,CRUELTY UNFORGIVEN,A Brilliant Tale of a Car And a Moose who must Battle a Dentist in Nigeria,2006,1,7,69,29.99,G,"Deleted Scenes,Behind the Scenes",Classics,4,886,"Deleted Scenes,Behind the Scenes",29.99,0.99,7,20050602,877,2005-05-30 05:48:59
211,11,LISA,ANDERSON,LISA.ANDERSON@sakilacustomer.org,86,BOOGIE AMELIE,A Lacklusture Character Study of a Husband And a Sumo Wrestler who must Succumb a Technical Writer in The Gulf of Mexico,2006,1,6,121,11.99,R,"Commentaries,Behind the Scenes",Music,12,390,"Commentaries,Behind the Scenes",11.99,4.99,6,20050607,987,2005-05-30 22:59:12
146,18,CAROL,GARCIA,CAROL.GARCIA@sakilacustomer.org,174,CONFIDENTIAL INTERVIEW,A Stunning Reflection of a Cat And a Woman who must Find a Astronaut in Ancient Japan,2006,1,6,180,13.99,NC-17,Commentaries,Music,12,800,Commentaries,13.99,4.99,6,20050602,692,2005-05-29 01:32:10
123,19,RUTH,MARTINEZ,RUTH.MARTINEZ@sakilacustomer.org,83,BLUES INSTINCT,A Insightful Documentary of a Boat And a Composer who must Meet a Forensic Psychologist in An Abandoned Fun House,2006,1,5,50,18.99,G,"Trailers,Deleted Scenes,Behind the Scenes",Family,8,377,"Trailers,Deleted Scenes,Behind the Scenes",18.99,2.99,5,20050529,591,2005-05-28 13:11:04
45,20,SHARON,ROBINSON,SHARON.ROBINSON@sakilacustomer.org,147,CHOCOLAT HARRY,A Action-Packed Epistle of a Dentist And a Moose who must Meet a Mad Cow in Ancient Japan,2006,1,5,101,16.99,NC-17,"Commentaries,Behind the Scenes",Family,8,674,"Commentaries,Behind the Scenes",16.99,0.99,5,20050602,202,2005-05-26 07:27:36
60,21,MICHELLE,CLARK,MICHELLE.CLARK@sakilacustomer.org,96,BREAKING HOME,A Beautiful Display of a Secret Agent And a Monkey who must Battle a Sumo Wrestler in An Abandoned Mine Shaft,2006,1,4,169,21.99,PG-13,"Trailers,Commentaries",New,13,435,"Trailers,Commentaries",21.99,2.99,4,20050531,260,2005-05-26 15:42:20
82,28,CYNTHIA,YOUNG,CYNTHIA.YOUNG@sakilacustomer.org,96,BREAKING HOME,A Beautiful Display of a Secret Agent And a Monkey who must Battle a Sumo Wrestler in An Abandoned Mine Shaft,2006,1,4,169,21.99,PG-13,"Trailers,Commentaries",New,13,434,"Trailers,Commentaries",21.99,2.99,4,20050530,388,2005-05-27 10:37:27
154,36,KATHLEEN,ADAMS,KATHLEEN.ADAMS@sakilacustomer.org,17,ALONE TRIP,A Fast-Paced Character Study of a Composer And a Dog who must Outgun a Boat in An Abandoned Fun House,2006,1,3,82,14.99,R,"Trailers,Behind the Scenes",Music,12,85,"Trailers,Behind the Scenes",14.99,0.99,3,20050601,716,2005-05-29 04:35:29
19,44,MARIE,TURNER,MARIE.TURNER@sakilacustomer.org,117,CANDLES GRAPES,A Fanciful Character Study of a Monkey And a Explorer who must Build a Astronaut in An Abandoned Fun House,2006,1,6,135,15.99,NC-17,"Trailers,Deleted Scenes",Games,10,535,"Trailers,Deleted Scenes",15.99,4.99,6,20050528,99,2005-05-25 16:50:20


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

col_name,data_type,comment
fact_rental_key,bigint,
customer_id,bigint,
customer_first_name,varchar(65535),
customer_last_name,varchar(65535),
customer_email,varchar(65535),
film_id,bigint,
film_title,string,
film_description,string,
film_release_year,int,
film_langauge_id,int,


##### 6.3. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT 
    c.customer_id,
    c.first_name AS customer_first_name,
    c.last_name AS customer_last_name,
    c.email AS customer_email,
    COUNT(f.rental_id) AS total_rentals,
    AVG(inventory.rental_rate) AS avg_rental_rate

FROM 
    fact_rentals_silver f
LEFT JOIN 
    dim_customer c ON f.customer_id = c.customer_id
LEFT JOIN 
    dim_inventory inventory ON f.inventory_key = inventory.inventory_key
LEFT JOIN 
    dim_rental d_rental ON f.rental_id = d_rental.rental_id
GROUP BY 
    c.customer_id, c.first_name, c.last_name, c.email;

customer_id,customer_first_name,customer_last_name,customer_email,total_rentals,avg_rental_rate
246,MARIAN,MENDOZA,MARIAN.MENDOZA@sakilacustomer.org,2,2.99
526,KARL,SEAL,KARL.SEAL@sakilacustomer.org,1,2.99
152,ALICIA,MILLS,ALICIA.MILLS@sakilacustomer.org,1,4.99
401,TONY,CARRANZA,TONY.CARRANZA@sakilacustomer.org,1,4.99
294,SHELLY,WATTS,SHELLY.WATTS@sakilacustomer.org,1,0.99
298,ERIKA,PENA,ERIKA.PENA@sakilacustomer.org,1,0.99
409,RODNEY,MOELLER,RODNEY.MOELLER@sakilacustomer.org,1,2.99
28,CYNTHIA,YOUNG,CYNTHIA.YOUNG@sakilacustomer.org,1,2.99
266,NORA,HERRERA,NORA.HERRERA@sakilacustomer.org,1,2.99
498,GENE,SANBORN,GENE.SANBORN@sakilacustomer.org,1,2.99


#### Clean up the File System

In [0]:
%fs rm -r /FileStore/final_data/

[0;31m---------------------------------------------------------------------------[0m
[0;31mExecutionError[0m                            Traceback (most recent call last)
File [0;32m<command-2560316821073275>, line 1[0m
[0;32m----> 1[0m display(dbutils[38;5;241m.[39mfs[38;5;241m.[39mls(batch_dir))

File [0;32m/databricks/python_shell/dbruntime/dbutils.py:158[0m, in [0;36mprettify_exception_message.<locals>.f_with_exception_handling[0;34m(*args, **kwargs)[0m
[1;32m    156[0m exc[38;5;241m.[39m__context__ [38;5;241m=[39m [38;5;28;01mNone[39;00m
[1;32m    157[0m exc[38;5;241m.[39m__cause__ [38;5;241m=[39m [38;5;28;01mNone[39;00m
[0;32m--> 158[0m [38;5;28;01mraise[39;00m exc

[0;31mExecutionError[0m: An error occurred while calling o422.ls.
: java.io.FileNotFoundException: No such file or directory dbfs:/FileStore/final_data/sakila/batch
	at com.databricks.backend.daemon.data.client.DBFSV2.$anonfun$listStatus$2(DatabricksFileSystemV2.scala:193)
	at com