## Lab 06: Data Lakehouse with Structured Streaming
This lab will help you learn to use many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-session capstone project for course **DS-2002: Data Systems**. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "aen6ju-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "aen6ju",
  "password" : "Password123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0sb.od5pn"
atlas_database_name = "sakila_dw"
atlas_user_name = "aen6ju"
atlas_password = "Password123"
# "user_name" : "aen6ju",
#     "password" : "Password123",
#     "cluster_name" : "Cluster0SB",

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/ds2002-final"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/rentals"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rentals_stream_dir = f"{stream_dir}/rentals"

output_bronze = f"{database_dir}/fact_orders/bronze"
output_silver = f"{database_dir}/fact_orders/silver"
output_gold   = f"{database_dir}/fact_orders/gold"



# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 


# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/ds2002-final/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://aen6ju-mysql.mysql.database.azure.com:3306/sakila_dw", --Replace with your Server Name
  dbtable "dim_date",
  user "aen6ju",    --Replace with your User Name
  password "Password123"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-final/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
year,int,
month,int,
day,int,
hour,int,
minute,int,
second,int,
date_key,bigint,
date,timestamp,
,,
# Delta Statistics Columns,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

year,month,day,hour,minute,second,date_key,date
2005,5,25,11,30,37,1,2005-05-25T11:30:37Z
2005,5,28,10,35,23,2,2005-05-28T10:35:23Z
2005,6,15,0,54,12,3,2005-06-15T00:54:12Z
2005,6,15,18,2,53,4,2005-06-15T18:02:53Z
2005,6,15,21,8,46,5,2005-06-15T21:08:46Z


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://aen6ju-mysql.mysql.database.azure.com:3306/sakila_dw", --Replace with your Server Name
  dbtable "dim_customer",
  user "aen6ju",    --Replace with your User Name
  password "Password123"  --Replace with you password
)


In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_customer
COMMENT "Customers Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-final/sakila_dlh/dim_customer"
AS SELECT * FROM view_customer


num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_id,bigint,
store_id,bigint,
first_name,varchar(65535),
last_name,varchar(65535),
email,varchar(65535),
address_id,bigint,
active,bigint,
create_date,varchar(65535),
last_update,varchar(65535),
customer_key,bigint,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update,customer_key
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20,1
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20,2
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20,3
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20,4
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20,5


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/ds2002-final/rentals/batch'

path,name,size,modificationTime
dbfs:/FileStore/ds2002-final/rentals/batch/dim_customer.csv,dim_customer.csv,63726,1733437424000
dbfs:/FileStore/ds2002-final/rentals/batch/dim_date.csv,dim_date.csv,44008,1733437424000
dbfs:/FileStore/ds2002-final/rentals/batch/dim_date.json,dim_date.json,153963,1733437424000
dbfs:/FileStore/ds2002-final/rentals/batch/dim_staff.csv,dim_staff.csv,235,1733437424000
dbfs:/FileStore/ds2002-final/rentals/batch/dim_staff.json,dim_staff.json,500,1733437424000
dbfs:/FileStore/ds2002-final/rentals/batch/dim_store.csv,dim_store.csv,103,1733437424000
dbfs:/FileStore/ds2002-final/rentals/batch/dim_store.json,dim_store.json,247,1733437425000
dbfs:/FileStore/ds2002-final/rentals/batch/fact_payment.csv,fact_payment.csv,19596,1733437425000
dbfs:/FileStore/ds2002-final/rentals/batch/fact_payment.json,fact_payment.json,126540,1733437425000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds2002-final/rentals/batch'
json_files = {"store" : 'dim_store.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f43bc22a900>

##### 2.3.1. Fetch Store Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "aen6ju"
val pwd = "Password123"
val clusterName = "cluster0sb.od5pn"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_store = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw")
.option("collection", "store").load()
.select("store_id","manager_staff_id","address_id","last_update","store_key")

display(df_store)

store_id,manager_staff_id,address_id,last_update,store_key
1,1,1,1139979432000,1
2,2,2,1139979432000,2


In [0]:
%scala
df_store.printSchema()

In [0]:
%scala
df_store.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_store")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_store

col_name,data_type,comment
store_id,int,
manager_staff_id,int,
address_id,int,
last_update,bigint,
store_key,int,
,,
# Delta Statistics Columns,,
Column Names,"store_id, address_id, manager_staff_id, last_update, store_key",
Column Selection Method,first-32,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_store LIMIT 5

store_id,manager_staff_id,address_id,last_update,store_key
1,1,1,1139979432000,1
2,2,2,1139979432000,2


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
staff_csv = f"{batch_dir}/dim_staff.csv"

df_staff = spark.read.format('csv').options(header='true', inferSchema='true').load(staff_csv)
display(df_staff)

staff_id,first_name,last_name,address_id,email,store_id,active,username,last_update,staff_key
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,1139975836000,1
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,1139975836000,2


In [0]:
df_staff.printSchema()

root
 |-- staff_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- email: string (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- username: string (nullable = true)
 |-- last_update: long (nullable = true)
 |-- staff_key: integer (nullable = true)



In [0]:
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
staff_id,int,
first_name,string,
last_name,string,
address_id,int,
email,string,
store_id,int,
active,int,
username,string,
last_update,bigint,
staff_key,int,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5;

staff_id,first_name,last_name,address_id,email,store_id,active,username,last_update,staff_key
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,1139975836000,1
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,1139975836000,2


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_staff,False
sakila_dlh,dim_store,False
,_sqldf,True
,view_customer,True
,view_date,True


^^EVERYTHING BEFORE THIS IS DONE AND MAKING THE SAKILA_DLH

### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 #.option("cloudFiles.schemaHints", "order_key BIGINT")
 #.option("cloudFiles.schemaHints", "employee_key BIGINT")
 #.option("cloudFiles.schemaHints", "customer_key BIGINT") 
 #.option("cloudFiles.schemaHints", "product_key BIGINT")
 #.option("cloudFiles.schemaHints", "shipper_key DECIMAL")
 #.option("cloudFiles.schemaHints", "order_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "paid_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 #.option("cloudFiles.schemaHints", "quantity DECIMAL")
 #.option("cloudFiles.schemaHints", "unit_price DECIMAL")
 #.option("cloudFiles.schemaHints", "discount DECIMAL")
 #.option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 #.option("cloudFiles.schemaHints", "taxes DECIMAL")
 #.option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 #.option("cloudFiles.schemaHints", "payment_type STRING")
 #.option("cloudFiles.schemaHints", "order_status STRING")
 #.option("cloudFiles.schemaHints", "order_details_status STRING")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("fact_rentals_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW fact_orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM fact_rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM fact_orders_bronze_tempview

amount,customer_key,date_key,fact_rental_key,staff_id,store_key,_rescued_data,receipt_time,source_file
2.99,13,333,335,2,2,,2024-12-07T21:19:30.77Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment2-1.json
8.99,13,334,336,1,1,,2024-12-07T21:19:30.77Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment2-1.json
2.99,13,335,337,1,1,,2024-12-07T21:19:30.77Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment2-1.json
0.99,13,336,338,1,1,,2024-12-07T21:19:30.77Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment2-1.json
2.99,13,337,339,1,1,,2024-12-07T21:19:30.77Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment2-1.json
2.99,13,338,340,1,1,,2024-12-07T21:19:30.77Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment2-1.json
2.99,13,339,341,1,1,,2024-12-07T21:19:30.77Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment2-1.json
11.99,13,340,342,2,2,,2024-12-07T21:19:30.77Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment2-1.json
9.99,13,341,343,2,2,,2024-12-07T21:19:30.77Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment2-1.json
0.99,13,342,344,2,2,,2024-12-07T21:19:30.77Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment2-1.json


In [0]:
(spark.table("fact_orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze1"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f43dbd8b410>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze1")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

amount,customer_key,date_key,fact_rental_key,staff_id,store_key,_rescued_data,receipt_time,source_file
2.99,1,1,1,1,1,,2024-12-07T21:18:59.283Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment1-1.json
0.99,1,2,2,1,1,,2024-12-07T21:18:59.283Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment1-1.json
5.99,1,3,3,1,1,,2024-12-07T21:18:59.283Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment1-1.json
0.99,1,4,4,2,2,,2024-12-07T21:18:59.283Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment1-1.json
9.99,1,5,5,2,2,,2024-12-07T21:18:59.283Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment1-1.json
4.99,1,6,6,1,1,,2024-12-07T21:18:59.283Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment1-1.json
4.99,1,7,7,1,1,,2024-12-07T21:18:59.283Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment1-1.json
0.99,1,8,8,2,2,,2024-12-07T21:18:59.283Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment1-1.json
3.99,1,9,9,1,1,,2024-12-07T21:18:59.283Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment1-1.json
5.99,1,10,10,2,2,,2024-12-07T21:18:59.283Z,dbfs:/FileStore/ds2002-final/rentals/stream/rentals/fact_payment1-1.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
amount,double,
customer_key,bigint,
date_key,bigint,
fact_rental_key,bigint,
staff_id,bigint,
store_key,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT r.fact_rental_key,
      r.amount,
      c.customer_key,
      c.first_name as customer_first_name,
      c.last_name as customer_last_name,
      c.email as customer_email,
      c.address_id as customer_address_id,
      d.date_key,
      d.year,
      d.month,
      d.day,
      d.hour,
      d.minute,
      staff.staff_key,
      staff.store_id,
      staff.first_name as staff_first_name,
      staff.last_name as staff_last_name,
      staff.email as staff_email,
      store.address_id as store_address_id,
      store.manager_staff_id as manager_id,
      store.store_key
      

  FROM rentals_silver_tempview AS r

  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_key = r.customer_key

  INNER JOIN sakila_dlh.dim_staff AS staff
  ON staff.staff_key = r.staff_id
  
  INNER JOIN sakila_dlh.dim_store AS store
  ON store.store_key = r.store_key

  INNER JOIN sakila_dlh.dim_date AS d
  ON d.date_key = r.date_key
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f43dbd4cfd0>

In [0]:
%sql
SELECT * FROM fact_orders_silver

fact_rental_key,amount,customer_key,customer_first_name,customer_last_name,customer_email,customer_address_id,date_key,year,month,day,hour,minute,staff_key,store_id,staff_first_name,staff_last_name,staff_email,store_address_id,manager_id,store_key
32,5.99,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,32,2005,8,22,20,3,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,1,1
59,4.99,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,59,2005,8,23,17,39,2,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,2,2,2
85,2.99,3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,85,2005,8,23,7,10,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,1,1
107,1.99,4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,107,2005,8,23,7,43,2,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,2,2,2
145,0.99,5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,145,2006,2,14,15,16,2,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,2,2,2
173,0.99,6,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,173,2005,8,23,6,41,2,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,2,2,2
206,5.99,7,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,206,2005,8,21,4,49,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,1,1
230,4.99,8,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,230,2005,8,23,14,31,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,1,1
253,4.99,9,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,145,2006,2,14,15,16,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,1,1
278,5.99,10,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,277,2005,8,22,21,59,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,1,1


Databricks data profile. Run in Databricks to view.

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_orders_silver

col_name,data_type,comment
fact_rental_key,bigint,
amount,double,
customer_key,bigint,
customer_first_name,varchar(65535),
customer_last_name,varchar(65535),
customer_email,varchar(65535),
customer_address_id,bigint,
date_key,bigint,
year,int,
month,int,


##### 6.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach. The table includes the number of amount of money spent per customer each Month, along with the Customers' ID, First & Last Name, and the Month in which the order was placed.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_monthly_spent_by_customer_gold AS (
  SELECT customer_key AS CustomerID
    , customer_last_name AS LastName
    , customer_first_name AS FirstName
    , month AS OrderMonth
    , SUM(amount) AS total_spent
  FROM sakila_dlh.fact_orders_silver
  GROUP BY CustomerID, LastName, FirstName, OrderMonth)
  ORDER BY OrderMonth DESC, customer_last_name, customer_first_name;

SELECT * FROM sakila_dlh.fact_monthly_spent_by_customer_gold;



CustomerID,LastName,FirstName,OrderMonth,total_spent
11,ANDERSON,LISA,8,57.84000000000002
5,BROWN,ELIZABETH,8,107.73999999999997
21,CLARK,MICHELLE,8,79.82999999999998
6,DAVIS,JENNIFER,8,97.75999999999998
18,GARCIA,CAROL,8,18.94
15,HARRIS,HELEN,8,79.82
13,JACKSON,KAREN,8,49.900000000000006
2,JOHNSON,PATRICIA,8,89.77999999999999
4,JONES,BARBARA,8,85.77999999999999
24,LEE,KIMBERLY,8,33.910000000000004


In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_revenue_by_staff_gold AS (
  SELECT 
  staff_first_name, 
  staff_last_name, 
  COUNT(fact_rental_key) AS payments_processed, SUM(amount) AS total_revenue
  FROM sakila_dlh.fact_orders_silver
  GROUP BY staff_first_name, staff_last_name
  ORDER BY total_revenue DESC);

SELECT * FROM sakila_dlh.fact_revenue_by_staff_gold;

staff_first_name,staff_last_name,payments_processed,total_revenue
Mike,Hillyer,520,2155.789999999998
Jon,Stephens,481,2039.200000000004


#### Clean up the File System

In [0]:
%fs rm -r /FileStore/lab_data/

org.apache.spark.sql.catalyst.ExtendedAnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `product_key` cannot be resolved. Did you mean one of the following? [`date_key`, `minute`, `staff_key`, `store_key`, `amount`]. SQLSTATE: 42703; line 6 pos 12;
'ReplaceTableAsSelect TableSpec(Map(),None,Map(),None,None,None,false,Set(),None,None,None), true, false
:- ResolvedIdentifier com.databricks.sql.managedcatalog.UnityCatalogV2Proxy@126f804c, sakila_dlh.fact_monthly_orders_by_customer_gold
+- 'Sort ['ProductCount DESC NULLS LAST], true
   +- 'Aggregate ['CustomerID, 'LastName, 'FirstName, 'OrderMonth], [customer_key#305339L AS CustomerID#305305L, customer_last_name#305341 AS LastName#305306, customer_first_name#305340 AS FirstName#305307, month#305346 AS OrderMonth#305308, 'COUNT('product_key) AS ProductCount#305309]
      +- SubqueryAlias spark_catalog.sakila_dlh.fact_orders_silver
         +- Relation spark_catalog.sakila_dlh.fact_ord