In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "fhc5mf-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "rmstaley",
  "password" : "Awesomeuva25!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "DS2002.fymrynp"
atlas_database_name = "sakila_dw2"
atlas_user_name = "rhiannonstaley"
atlas_password = "Awesomeuva25!"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/lab_data2/lab_data2"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch2"
stream_dir = f"{data_dir}/stream2"

orders_stream_dir = f"{stream_dir}/orders"
purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[23]: True

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result


## Section II: Populate Dimensions by Ingesting Reference Data
#### 1.0 Fetch Reference Data from an Azure MySQL Database

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Database"
LOCATION "dbfs:/FileStore/lab_data2/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final");

##### 1.2 Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://fhc5mf-mysql.mysql.database.azure.com:3306/sakila_dw", --Replace with your Server Name
  dbtable "dim_date",
  user "rmstaley",    --Replace with your User Name
  password "Awesomeuva25!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data2/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Film Dimension Data from an Azure MySQL database

In [0]:
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://fhc5mf-mysql.mysql.database.azure.com:3306/sakila_dw", --Replace with your Server Name
  dbtable "dim_film",
  user "rmstaley",    --Replace with your User Name
  password "Awesomeuva25!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_film" using data from the view named "view_product"
CREATE OR REPLACE TABLE sakila_dlh.dim_film
COMMENT "Film Dimension Table"
LOCATION "dbfs:/FileStore/lab_data2/sakila_dlh/dim_film"
AS SELECT * FROM view_product

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film;

col_name,data_type,comment
film_key,int,
title,string,
description,string,
release_year,date,
language_id,int,
original_language_id,int,
rental_duration,int,
rental_rate,"decimal(4,2)",
length,int,
replacement_cost,"decimal(5,2)",


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5

film_key,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
1,ACADEMY DINOSAUR,,2006-01-01,1,,6,0.99,86,19.99,PG,,2006-02-15T05:03:42.000+0000
2,ACE GOLDFINGER,,2006-01-01,1,,3,4.99,48,19.99,G,,2006-02-15T05:03:42.000+0000
3,ADAPTATION HOLES,,2006-01-01,1,,7,2.99,50,19.99,NC-17,,2006-02-15T05:03:42.000+0000
4,AFFAIR PREJUDICE,,2006-01-01,1,,5,2.99,117,19.99,G,,2006-02-15T05:03:42.000+0000
5,AFRICAN EGG,,2006-01-01,1,,6,2.99,130,19.99,G,,2006-02-15T05:03:42.000+0000


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data2/retail/batch2'

path,name,size,modificationTime
dbfs:/FileStore/lab_data2/lab_data2/retail/batch2/sakila_dimCustomer.json,sakila_dimCustomer.json,137407,1701983322000
dbfs:/FileStore/lab_data2/lab_data2/retail/batch2/sakila_dimInventory.json,sakila_dimInventory.json,415276,1701892416000
dbfs:/FileStore/lab_data2/lab_data2/retail/batch2/sakila_dimLanguage.json,sakila_dimLanguage.json,455,1701892417000
dbfs:/FileStore/lab_data2/lab_data2/retail/batch2/sakila_dimPayment-1.csv,sakila_dimPayment-1.csv,1050295,1701973064000
dbfs:/FileStore/lab_data2/lab_data2/retail/batch2/sakila_dimPayment.csv,sakila_dimPayment.csv,1050295,1701892418000
dbfs:/FileStore/lab_data2/lab_data2/retail/batch2/sakila_dimRental-1.json,sakila_dimRental-1.json,2994932,1701974287000
dbfs:/FileStore/lab_data2/lab_data2/retail/batch2/sakila_dimRental.json,sakila_dimRental.json,2,1701892415000


%md
##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/lab_data2/lab_data2/retail/batch2'
json_files = {"inventory" : 'sakila_dimInventory.json'
              , "language" : 'sakila_dimLanguage.json'
              , "rental": 'sakila_dimRental-1.json'
              , "customer": 'sakila_dimCustomer.json'}


set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files)

Out[25]: <pymongo.results.InsertManyResult at 0x7f9fe8ae0940>

##### 2.3.1. Fetch Inventory Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "rhiannonstaley"
val pwd = "Awesomeuva25!"
val clusterName = "DS2002.fymrynp"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_inventory = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw2")
.option("collection", "inventory").load()
.select("inventory_key","film_id","store_id","last_update")

display(df_inventory)

inventory_key,film_id,store_id,last_update
1,1,1,2006-02-15 05:09:17
2,1,1,2006-02-15 05:09:17
3,1,1,2006-02-15 05:09:17
4,1,1,2006-02-15 05:09:17
5,1,2,2006-02-15 05:09:17
6,1,2,2006-02-15 05:09:17
7,1,2,2006-02-15 05:09:17
8,1,2,2006-02-15 05:09:17
9,2,2,2006-02-15 05:09:17
10,2,2,2006-02-15 05:09:17


In [0]:
%scala
df_inventory.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Inventory Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory

col_name,data_type,comment
inventory_key,int,
film_id,int,
store_id,int,
last_update,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_inventory,
Created Time,Thu Dec 07 20:49:04 UTC 2023,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

inventory_key,film_id,store_id,last_update
1,1,1,2006-02-15 05:09:17
2,1,1,2006-02-15 05:09:17
3,1,1,2006-02-15 05:09:17
4,1,1,2006-02-15 05:09:17
5,1,2,2006-02-15 05:09:17


##### 2.4.1 Fetch Language Dimension Data from the New MongoDB Collection

In [0]:
%scala
val df_language = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw2")
.option("collection", "language").load()
.select("language_key","name", "last_update")

display(df_language)

language_key,name,last_update
1,English,2006-02-15 05:02:19
2,Italian,2006-02-15 05:02:19
3,Japanese,2006-02-15 05:02:19
4,Mandarin,2006-02-15 05:02:19
5,French,2006-02-15 05:02:19
6,German,2006-02-15 05:02:19


In [0]:
%scala
df_language.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Language Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_language.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_language")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_language

col_name,data_type,comment
language_key,int,
name,string,
last_update,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_language,
Created Time,Thu Dec 07 20:49:19 UTC 2023,
Last Access,UNKNOWN,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_language LIMIT 5

language_key,name,last_update
1,English,2006-02-15 05:02:19
2,Italian,2006-02-15 05:02:19
3,Japanese,2006-02-15 05:02:19
4,Mandarin,2006-02-15 05:02:19
5,French,2006-02-15 05:02:19


##### Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw2")
.option("collection", "customer").load()
.select("customer_key", "store_id", "first_name", "last_name", "email", "address_id", "active", "create_date", "last_update")

display(df_customer)

customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20
6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1,2006-02-14 22:04:36,2006-02-15 04:57:20
7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,1,2006-02-14 22:04:36,2006-02-15 04:57:20
8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,1,2006-02-14 22:04:36,2006-02-15 04:57:20
9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,1,2006-02-14 22:04:36,2006-02-15 04:57:20
10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
address_id,int,
active,int,
create_date,string,
last_update,string,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20


##### 2.4.1 Fetch Rental Dimension Data from the New MongoDB Collection

In [0]:
%scala
val df_rental = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw2")
.option("collection", "rental").load()
.select("rental_key", "rental_date", "inventory_id", "customer_id", "return_date", "staff_id", "last_update")

display(df_rental)

rental_key,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53
3,2005-05-24 23:03:39,1711,408,2005-06-01 22:12:39,1,2006-02-15 21:30:53
4,2005-05-24 23:04:41,2452,333,2005-06-03 01:43:41,2,2006-02-15 21:30:53
5,2005-05-24 23:05:21,2079,222,2005-06-02 04:33:21,1,2006-02-15 21:30:53
6,2005-05-24 23:08:07,2792,549,2005-05-27 01:32:07,1,2006-02-15 21:30:53
7,2005-05-24 23:11:53,3995,269,2005-05-29 20:34:53,2,2006-02-15 21:30:53
8,2005-05-24 23:31:46,2346,239,2005-05-27 23:33:46,2,2006-02-15 21:30:53
9,2005-05-25 00:00:40,2580,126,2005-05-28 00:22:40,1,2006-02-15 21:30:53
10,2005-05-25 00:02:21,1824,399,2005-05-31 22:44:21,2,2006-02-15 21:30:53


In [0]:
%scala
df_rental.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Rental Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_rental.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_rental")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_rental

col_name,data_type,comment
rental_key,int,
rental_date,string,
inventory_id,int,
customer_id,int,
return_date,string,
staff_id,int,
last_update,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_rental LIMIT 5

rental_key,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53
3,2005-05-24 23:03:39,1711,408,2005-06-01 22:12:39,1,2006-02-15 21:30:53
4,2005-05-24 23:04:41,2452,333,2005-06-03 01:43:41,2,2006-02-15 21:30:53
5,2005-05-24 23:05:21,2079,222,2005-06-02 04:33:21,1,2006-02-15 21:30:53


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
payment_csv = f"{batch_dir}/sakila_dimPayment-1.csv"

df_payment = spark.read.format('csv').options(header='true', inferSchema='true').load(payment_csv)
display(df_payment)

payment_key,customer_id,staff_id,rental_id,amount,payment_date,last_update
1,1,1,76,2.99,2005-05-25T11:30:37.000+0000,2006-02-15T22:12:30.000+0000
2,1,1,573,0.99,2005-05-28T10:35:23.000+0000,2006-02-15T22:12:30.000+0000
3,1,1,1185,5.99,2005-06-15T00:54:12.000+0000,2006-02-15T22:12:30.000+0000
4,1,2,1422,0.99,2005-06-15T18:02:53.000+0000,2006-02-15T22:12:30.000+0000
5,1,2,1476,9.99,2005-06-15T21:08:46.000+0000,2006-02-15T22:12:30.000+0000
6,1,1,1725,4.99,2005-06-16T15:18:57.000+0000,2006-02-15T22:12:30.000+0000
7,1,1,2308,4.99,2005-06-18T08:41:48.000+0000,2006-02-15T22:12:30.000+0000
8,1,2,2363,0.99,2005-06-18T13:33:59.000+0000,2006-02-15T22:12:30.000+0000
9,1,1,3284,3.99,2005-06-21T06:24:45.000+0000,2006-02-15T22:12:30.000+0000
10,1,2,4526,5.99,2005-07-08T03:17:05.000+0000,2006-02-15T22:12:30.000+0000


In [0]:
df_payment.printSchema()

root
 |-- payment_key: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: timestamp (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_payment")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_payment;

col_name,data_type,comment
payment_key,int,
customer_id,int,
staff_id,int,
rental_id,int,
amount,double,
payment_date,timestamp,
last_update,timestamp,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_payment LIMIT 5;

payment_key,customer_id,staff_id,rental_id,amount,payment_date,last_update
1,1,1,76,2.99,2005-05-25T11:30:37.000+0000,2006-02-15T22:12:30.000+0000
2,1,1,573,0.99,2005-05-28T10:35:23.000+0000,2006-02-15T22:12:30.000+0000
3,1,1,1185,5.99,2005-06-15T00:54:12.000+0000,2006-02-15T22:12:30.000+0000
4,1,2,1422,0.99,2005-06-15T18:02:53.000+0000,2006-02-15T22:12:30.000+0000
5,1,2,1476,9.99,2005-06-15T21:08:46.000+0000,2006-02-15T22:12:30.000+0000


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_language,False
sakila_dlh,dim_payment,False
sakila_dlh,dim_rental,False
,display_query_1,True
,display_query_2,True
,fact_orders_silver_tempview,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 #.option("cloudFiles.schemaHints", "order_key BIGINT")
 #.option("cloudFiles.schemaHints", "employee_key BIGINT")
 #.option("cloudFiles.schemaHints", "customer_key BIGINT") 
 #.option("cloudFiles.schemaHints", "product_key BIGINT")
 #.option("cloudFiles.schemaHints", "shipper_key DECIMAL")
 #.option("cloudFiles.schemaHints", "order_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "paid_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 #.option("cloudFiles.schemaHints", "quantity DECIMAL")
 #.option("cloudFiles.schemaHints", "unit_price DECIMAL")
 #.option("cloudFiles.schemaHints", "discount DECIMAL")
 #.option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 #.option("cloudFiles.schemaHints", "taxes DECIMAL")
 #.option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 #.option("cloudFiles.schemaHints", "payment_type STRING")
 #.option("cloudFiles.schemaHints", "order_status STRING")
 #.option("cloudFiles.schemaHints", "order_details_status STRING")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

amount,customer_id,index_key,inventory_id,last_update,payment_date,payment_id,rental_date,rental_id,staff_id,_rescued_data,receipt_time,source_file
4.99,596,666,797,2006-02-15 21:30:53,2005-05-28 21:49:02,15958,2005-05-28 21:49:02,667,1,,2023-12-08T02:04:59.439+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
2.99,484,667,3528,2006-02-15 21:30:53,2005-05-28 21:54:45,13056,2005-05-28 21:54:45,668,1,,2023-12-08T02:04:59.439+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
4.99,313,668,3677,2006-02-15 21:30:53,2005-05-28 22:03:25,8481,2005-05-28 22:03:25,669,1,,2023-12-08T02:04:59.439+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
6.99,201,669,227,2006-02-15 21:30:53,2005-05-28 22:04:03,5446,2005-05-28 22:04:03,670,2,,2023-12-08T02:04:59.439+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
2.99,14,670,1027,2006-02-15 21:30:53,2005-05-28 22:04:30,361,2005-05-28 22:04:30,671,2,,2023-12-08T02:04:59.439+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
6.99,306,671,697,2006-02-15 21:30:53,2005-05-28 22:05:29,8295,2005-05-28 22:05:29,672,2,,2023-12-08T02:04:59.439+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
0.99,468,672,1769,2006-02-15 21:30:53,2005-05-28 22:07:30,12615,2005-05-28 22:07:30,673,1,,2023-12-08T02:04:59.439+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
2.99,87,673,1150,2006-02-15 21:30:53,2005-05-28 22:11:35,2359,2005-05-28 22:11:35,674,2,,2023-12-08T02:04:59.439+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
0.99,338,674,1273,2006-02-15 21:30:53,2005-05-28 22:22:44,9126,2005-05-28 22:22:44,675,2,,2023-12-08T02:04:59.439+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
4.99,490,675,2329,2006-02-15 21:30:53,2005-05-28 22:27:51,13210,2005-05-28 22:27:51,676,2,,2023-12-08T02:04:59.439+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

Out[43]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f9ff01d8070>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

amount,customer_id,index_key,inventory_id,last_update,payment_date,payment_id,rental_date,rental_id,staff_id,_rescued_data,receipt_time,source_file
4.99,596,666,797,2006-02-15 21:30:53,2005-05-28 21:49:02,15958,2005-05-28 21:49:02,667,1,,2023-12-08T02:05:05.515+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
2.99,484,667,3528,2006-02-15 21:30:53,2005-05-28 21:54:45,13056,2005-05-28 21:54:45,668,1,,2023-12-08T02:05:05.515+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
4.99,313,668,3677,2006-02-15 21:30:53,2005-05-28 22:03:25,8481,2005-05-28 22:03:25,669,1,,2023-12-08T02:05:05.515+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
6.99,201,669,227,2006-02-15 21:30:53,2005-05-28 22:04:03,5446,2005-05-28 22:04:03,670,2,,2023-12-08T02:05:05.515+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
2.99,14,670,1027,2006-02-15 21:30:53,2005-05-28 22:04:30,361,2005-05-28 22:04:30,671,2,,2023-12-08T02:05:05.515+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
6.99,306,671,697,2006-02-15 21:30:53,2005-05-28 22:05:29,8295,2005-05-28 22:05:29,672,2,,2023-12-08T02:05:05.515+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
0.99,468,672,1769,2006-02-15 21:30:53,2005-05-28 22:07:30,12615,2005-05-28 22:07:30,673,1,,2023-12-08T02:05:05.515+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
2.99,87,673,1150,2006-02-15 21:30:53,2005-05-28 22:11:35,2359,2005-05-28 22:11:35,674,2,,2023-12-08T02:05:05.515+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
0.99,338,674,1273,2006-02-15 21:30:53,2005-05-28 22:22:44,9126,2005-05-28 22:22:44,675,2,,2023-12-08T02:05:05.515+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json
4.99,490,675,2329,2006-02-15 21:30:53,2005-05-28 22:27:51,13210,2005-05-28 22:27:51,676,2,,2023-12-08T02:05:05.515+0000,dbfs:/FileStore/lab_data2/lab_data2/retail/stream2/orders/sakila_factOrders3.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
amount,double,
customer_id,bigint,
index_key,bigint,
inventory_id,bigint,
last_update,string,
payment_date,string,
payment_id,bigint,
rental_date,string,
rental_id,bigint,
staff_id,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.amount,
      o.customer_id,
      o.index_key,
      o.inventory_id,
      o.last_update,
      o.payment_date,
      o.payment_id,
      o.rental_date AS order_rental_date,
      o.staff_id,
      o._rescued_data,
      o.receipt_time,
      c.customer_key,
      d.full_date AS dim_full_date
  FROM orders_silver_tempview AS o
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_key = o.customer_id
  LEFT JOIN sakila_dlh.dim_date AS d
  ON d.full_date = o.rental_date
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

Out[49]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f9ff0164460>

In [0]:
%sql
SELECT * FROM fact_orders_silver

amount,customer_id,index_key,inventory_id,last_update,payment_date,payment_id,order_rental_date,staff_id,_rescued_data,receipt_time,customer_key,dim_full_date
4.99,596,666,797,2006-02-15 21:30:53,2005-05-28 21:49:02,15958,2005-05-28 21:49:02,1,,2023-12-08T02:05:05.515+0000,596,2005-05-28
2.99,484,667,3528,2006-02-15 21:30:53,2005-05-28 21:54:45,13056,2005-05-28 21:54:45,1,,2023-12-08T02:05:05.515+0000,484,2005-05-28
4.99,313,668,3677,2006-02-15 21:30:53,2005-05-28 22:03:25,8481,2005-05-28 22:03:25,1,,2023-12-08T02:05:05.515+0000,313,2005-05-28
6.99,201,669,227,2006-02-15 21:30:53,2005-05-28 22:04:03,5446,2005-05-28 22:04:03,2,,2023-12-08T02:05:05.515+0000,201,2005-05-28
2.99,14,670,1027,2006-02-15 21:30:53,2005-05-28 22:04:30,361,2005-05-28 22:04:30,2,,2023-12-08T02:05:05.515+0000,14,2005-05-28
6.99,306,671,697,2006-02-15 21:30:53,2005-05-28 22:05:29,8295,2005-05-28 22:05:29,2,,2023-12-08T02:05:05.515+0000,306,2005-05-28
0.99,468,672,1769,2006-02-15 21:30:53,2005-05-28 22:07:30,12615,2005-05-28 22:07:30,1,,2023-12-08T02:05:05.515+0000,468,2005-05-28
2.99,87,673,1150,2006-02-15 21:30:53,2005-05-28 22:11:35,2359,2005-05-28 22:11:35,2,,2023-12-08T02:05:05.515+0000,87,2005-05-28
0.99,338,674,1273,2006-02-15 21:30:53,2005-05-28 22:22:44,9126,2005-05-28 22:22:44,2,,2023-12-08T02:05:05.515+0000,338,2005-05-28
4.99,490,675,2329,2006-02-15 21:30:53,2005-05-28 22:27:51,13210,2005-05-28 22:27:51,2,,2023-12-08T02:05:05.515+0000,490,2005-05-28


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_orders_silver

col_name,data_type,comment
amount,double,
customer_id,bigint,
index_key,bigint,
inventory_id,bigint,
last_update,string,
payment_date,string,
payment_id,bigint,
order_rental_date,string,
staff_id,bigint,
_rescued_data,string,


##### 6.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_price_by_customer_gold AS (
  SELECT customer_id AS Customer
  , SUM(amount) AS TotalPriceOverTime
  FROM sakila_dlh.fact_orders_silver
  GROUP BY Customer
  ORDER BY TotalPriceOverTime DESC);

SELECT * FROM sakila_dlh.fact_price_by_customer_gold;

Customer,TotalPriceOverTime
245,28.94
105,26.96
429,26.95
19,26.94
246,25.96
371,25.94
391,24.950000000000003
274,24.94
53,23.950000000000003
432,22.97
