Final Project

In [0]:
#### Importing Required Libraries
%pip install pymongo

import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
#### Instantiating Global Variables

# Azure MySQL Server Connection Information ###################
jdbc_hostname = "edward-zhang-azure-mysql-server.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila"

connection_properties = {
  "user" : "ezhang",
  "password" : "gamesPLAYsports333!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "taketwocluster.ozwxv"
atlas_database_name = "sakila"
atlas_user_name = "edwyzhang"
atlas_password = "IFNXmFLUY2sYMw9j"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

fact_table_stream_dir = f"{stream_dir}/fact_table"

fact_table_output_bronze = f"{database_dir}/fact_table/bronze"
fact_table_output_silver = f"{database_dir}/fact_table/silver"
fact_table_output_gold = f"{database_dir}/fact_table/gold"


# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_table", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

In [0]:
#### Defining Global Functions

#############################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
#############################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

#############################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
#############################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

In [0]:
#### Populating Dimensions by Ingesting Reference (Cold-path) Data
#### Fetching Reference Data From an Azure MySQL Database
#### Creating a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

In [0]:
##### Creating a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://edward-zhang-azure-mysql-server.mysql.database.azure.com/sakila",
  dbtable "dim_date",
  user "ezhang",   
  password "gamesPLAYsports333!" 
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20050101,2005-01-01,2005/01/01,01/01/2005,01/01/2005,7,Saturday,1,1,Weekend,53,January,1,N,1,2005,2005-01,2005Q1,1,1,2005,2005-01,2005Q1
20050102,2005-01-02,2005/01/02,01/02/2005,02/01/2005,1,Sunday,2,2,Weekend,53,January,1,N,1,2005,2005-01,2005Q1,1,1,2005,2005-01,2005Q1
20050103,2005-01-03,2005/01/03,01/03/2005,03/01/2005,2,Monday,3,3,Weekday,1,January,1,N,1,2005,2005-01,2005Q1,1,1,2005,2005-01,2005Q1
20050104,2005-01-04,2005/01/04,01/04/2005,04/01/2005,3,Tuesday,4,4,Weekday,1,January,1,N,1,2005,2005-01,2005Q1,1,1,2005,2005-01,2005Q1
20050105,2005-01-05,2005/01/05,01/05/2005,05/01/2005,4,Wednesday,5,5,Weekday,1,January,1,N,1,2005,2005-01,2005Q1,1,1,2005,2005-01,2005Q1


In [0]:
##### Creating a New Table that Sources Rental Dimension Data from an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_rental
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://edward-zhang-azure-mysql-server.mysql.database.azure.com/sakila",
  dbtable "dim_rentals",
  user "ezhang",   
  password "gamesPLAYsports333!" 
)
-- Create a Temporary View named "view_rental" that extracts data from your MySQL Sakila database.

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_rentals
COMMENT "Rentals Dimension Table"
LOCATION "dbfs:/FileStore/sakila_dlh/dim_rentals"
AS SELECT * FROM view_rental
-- Create a new table named "sakila_dlh.dim_rentals" using data from the view named "view_rental"

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_rentals;

col_name,data_type,comment
rental_key,int,
rental_id,int,
inventory_id,int,
customer_id,int,
rental_date,timestamp,
return_date,timestamp,
,,
# Delta Statistics Columns,,
Column Names,"customer_id, rental_date, rental_id, inventory_id, rental_key, return_date",
Column Selection Method,first-32,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_rentals LIMIT 5

rental_key,rental_id,inventory_id,customer_id,rental_date,return_date
1,1,367,130,2005-05-24T22:53:30Z,2005-05-26T22:04:30Z
2,2,1525,459,2005-05-24T22:54:33Z,2005-05-28T19:40:33Z
3,3,1711,408,2005-05-24T23:03:39Z,2005-06-01T22:12:39Z
4,4,2452,333,2005-05-24T23:04:41Z,2005-06-03T01:43:41Z
5,5,2079,222,2005-05-24T23:05:21Z,2005-06-02T04:33:21Z


In [0]:
#### Fetching Reference Data from a MongoDB Atlas Database
##### Viewing the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/retail/batch'

path,name,size,modificationTime
dbfs:/FileStore/retail/batch/dim_customers.csv,dim_customers.csv,33933,1733432255000
dbfs:/FileStore/retail/batch/dim_customers.json,dim_customers.json,98545,1733432273000
dbfs:/FileStore/retail/batch/dim_date.csv,dim_date.csv,185087,1733432255000
dbfs:/FileStore/retail/batch/dim_date.json,dim_date.json,663705,1733432273000
dbfs:/FileStore/retail/batch/dim_inventory.csv,dim_inventory.csv,35347,1733597459000
dbfs:/FileStore/retail/batch/dim_payments.csv,dim_payments.csv,57935,1733432255000
dbfs:/FileStore/retail/batch/dim_payments.json,dim_payments.json,173846,1733432273000
dbfs:/FileStore/retail/batch/dim_rentals.csv,dim_rentals.csv,60435,1733432273000
dbfs:/FileStore/retail/batch/fact_table_keys.csv,fact_table_keys.csv,110614,1733432255000


In [0]:
##### Creating a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/retail/batch'
json_files = {"dim_customers" : 'dim_customers.json'
              , "dim_payments" : 'dim_payments.json'
              }

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

InsertManyResult([ObjectId('67550ce19a99aecc3662ab5b'), ObjectId('67550ce19a99aecc3662ab5c'), ObjectId('67550ce19a99aecc3662ab5d'), ObjectId('67550ce19a99aecc3662ab5e'), ObjectId('67550ce19a99aecc3662ab5f'), ObjectId('67550ce19a99aecc3662ab60'), ObjectId('67550ce19a99aecc3662ab61'), ObjectId('67550ce19a99aecc3662ab62'), ObjectId('67550ce19a99aecc3662ab63'), ObjectId('67550ce19a99aecc3662ab64'), ObjectId('67550ce19a99aecc3662ab65'), ObjectId('67550ce19a99aecc3662ab66'), ObjectId('67550ce19a99aecc3662ab67'), ObjectId('67550ce19a99aecc3662ab68'), ObjectId('67550ce19a99aecc3662ab69'), ObjectId('67550ce19a99aecc3662ab6a'), ObjectId('67550ce19a99aecc3662ab6b'), ObjectId('67550ce19a99aecc3662ab6c'), ObjectId('67550ce19a99aecc3662ab6d'), ObjectId('67550ce19a99aecc3662ab6e'), ObjectId('67550ce19a99aecc3662ab6f'), ObjectId('67550ce19a99aecc3662ab70'), ObjectId('67550ce19a99aecc3662ab71'), ObjectId('67550ce19a99aecc3662ab72'), ObjectId('67550ce19a99aecc3662ab73'), ObjectId('67550ce19a99aecc3662ab

In [0]:
#### Fetching Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala

val userName = "edwyzhang"
val pwd = "IFNXmFLUY2sYMw9j"
val clusterName = "taketwocluster.ozwxv"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila")
.option("collection", "dim_customers").load()
.select("customer_key","customer_id","store_id","first_name","last_name","create_date")

display(df_customer)

customer_key,customer_id,store_id,first_name,last_name,create_date
1,1,1,MARY,SMITH,2006-02-14 22:04:36
2,2,1,PATRICIA,JOHNSON,2006-02-14 22:04:36
3,3,1,LINDA,WILLIAMS,2006-02-14 22:04:36
4,4,2,BARBARA,JONES,2006-02-14 22:04:36
5,5,1,ELIZABETH,BROWN,2006-02-14 22:04:36
6,6,2,JENNIFER,DAVIS,2006-02-14 22:04:36
7,7,1,MARIA,MILLER,2006-02-14 22:04:36
8,8,2,SUSAN,WILSON,2006-02-14 22:04:36
9,9,2,MARGARET,MOORE,2006-02-14 22:04:36
10,10,1,DOROTHY,TAYLOR,2006-02-14 22:04:36


In [0]:
%scala
df_customer.printSchema()

In [0]:
##### Using the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customers")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customers

col_name,data_type,comment
customer_key,int,
customer_id,int,
store_id,int,
first_name,string,
last_name,string,
create_date,string,
,,
# Delta Statistics Columns,,
Column Names,"first_name, customer_id, store_id, create_date, last_name, customer_key",
Column Selection Method,first-32,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customers LIMIT 5

customer_key,customer_id,store_id,first_name,last_name,create_date
1,1,1,MARY,SMITH,2006-02-14 22:04:36
2,2,1,PATRICIA,JOHNSON,2006-02-14 22:04:36
3,3,1,LINDA,WILLIAMS,2006-02-14 22:04:36
4,4,2,BARBARA,JONES,2006-02-14 22:04:36
5,5,1,ELIZABETH,BROWN,2006-02-14 22:04:36


In [0]:
#### Fetching Payment Dimension Data from the New MongoDB Collection

In [0]:
%scala
val df_payment = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila")
.option("collection", "dim_payments").load()
.select("payment_key","payment_id","customer_id","staff_id","rental_id","amount","payment_date")

display(df_payment)

payment_key,payment_id,customer_id,staff_id,rental_id,amount,payment_date
1,1,1,1,76,2.99,2005-05-25 11:30:37
2,2,1,1,573,0.99,2005-05-28 10:35:23
3,3,1,1,1185,5.99,2005-06-15 00:54:12
4,4,1,2,1422,0.99,2005-06-15 18:02:53
5,5,1,2,1476,9.99,2005-06-15 21:08:46
6,6,1,1,1725,4.99,2005-06-16 15:18:57
7,7,1,1,2308,4.99,2005-06-18 08:41:48
8,8,1,2,2363,0.99,2005-06-18 13:33:59
9,9,1,1,3284,3.99,2005-06-21 06:24:45
10,10,1,2,4526,5.99,2005-07-08 03:17:05


In [0]:
%scala
df_payment.printSchema()

In [0]:
##### Use the Spark DataFrame to Create a New Payment Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_payments")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_payments

col_name,data_type,comment
payment_key,int,
payment_id,int,
customer_id,int,
staff_id,int,
rental_id,int,
amount,double,
payment_date,string,
,,
# Delta Statistics Columns,,
Column Names,"customer_id, rental_id, payment_date, amount, payment_id, payment_key, staff_id",


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_payments LIMIT 5

payment_key,payment_id,customer_id,staff_id,rental_id,amount,payment_date
1,1,1,1,76,2.99,2005-05-25 11:30:37
2,2,1,1,573,0.99,2005-05-28 10:35:23
3,3,1,1,1185,5.99,2005-06-15 00:54:12
4,4,1,2,1422,0.99,2005-06-15 18:02:53
5,5,1,2,1476,9.99,2005-06-15 21:08:46


In [0]:
##### Fetching Data from a File System, Using PySpark to Read From a CSV File


In [0]:
inventory_csv = f"{batch_dir}/dim_inventory.csv"

df_inventory = spark.read.format('csv').options(header='true', inferSchema='true').load(inventory_csv)
display(df_inventory)

inventory_key,inventory_id,film_id,store_id,last_update
1,1,1,1,2006-02-15T05:09:17Z
2,2,1,1,2006-02-15T05:09:17Z
3,3,1,1,2006-02-15T05:09:17Z
4,4,1,1,2006-02-15T05:09:17Z
5,5,1,2,2006-02-15T05:09:17Z
6,6,1,2,2006-02-15T05:09:17Z
7,7,1,2,2006-02-15T05:09:17Z
8,8,1,2,2006-02-15T05:09:17Z
9,9,2,2,2006-02-15T05:09:17Z
10,10,2,2,2006-02-15T05:09:17Z


In [0]:
df_inventory.printSchema()

root
 |-- inventory_key: integer (nullable = true)
 |-- inventory_id: integer (nullable = true)
 |-- film_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory;

col_name,data_type,comment
inventory_key,int,
inventory_id,int,
film_id,int,
store_id,int,
last_update,timestamp,
,,
# Delta Statistics Columns,,
Column Names,"film_id, inventory_key, inventory_id, store_id, last_update",
Column Selection Method,first-32,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5;

inventory_key,inventory_id,film_id,store_id,last_update
1,1,1,1,2006-02-15T05:09:17Z
2,2,1,1,2006-02-15T05:09:17Z
3,3,1,1,2006-02-15T05:09:17Z
4,4,1,1,2006-02-15T05:09:17Z
5,5,1,2,2006-02-15T05:09:17Z


In [0]:
##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customers,False
sakila_dlh,dim_date,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_payments,False
sakila_dlh,dim_rentals,False
,_sqldf,True
,display_query_1,True
,display_query_2,True
,display_query_3,True
,fact_table_bronze_tempview,True


In [0]:
#### Integrating Reference Data with Real-Time Data
#### Using AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### Bronze Table: Processing 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", fact_table_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(fact_table_stream_dir)
 .createOrReplaceTempView("fact_table_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW fact_table_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM fact_table_raw_tempview
)

In [0]:
%sql
SELECT * FROM fact_table_bronze_tempview

amount,create_date_key,customer_key,email,fact_table_key,first_name,last_name,last_update,payment_date_key,payment_key,store_key,_rescued_data,receipt_time,source_file
4.99,20060214,25,DEBORAH.WALKER@sakilacustomer.org,667,DEBORAH,WALKER,2006-02-15 04:57:12,20050712,667,1,,2024-12-08T03:05:33.793Z,dbfs:/FileStore/retail/stream/fact_table/fact_table03.json
2.99,20060214,25,DEBORAH.WALKER@sakilacustomer.org,668,DEBORAH,WALKER,2006-02-15 04:57:12,20050712,668,1,,2024-12-08T03:05:33.793Z,dbfs:/FileStore/retail/stream/fact_table/fact_table03.json
2.99,20060214,25,DEBORAH.WALKER@sakilacustomer.org,669,DEBORAH,WALKER,2006-02-15 04:57:12,20050729,669,1,,2024-12-08T03:05:33.793Z,dbfs:/FileStore/retail/stream/fact_table/fact_table03.json
0.99,20060214,25,DEBORAH.WALKER@sakilacustomer.org,670,DEBORAH,WALKER,2006-02-15 04:57:12,20050730,670,1,,2024-12-08T03:05:33.793Z,dbfs:/FileStore/retail/stream/fact_table/fact_table03.json
8.99,20060214,25,DEBORAH.WALKER@sakilacustomer.org,671,DEBORAH,WALKER,2006-02-15 04:57:12,20050730,671,1,,2024-12-08T03:05:33.793Z,dbfs:/FileStore/retail/stream/fact_table/fact_table03.json
2.99,20060214,25,DEBORAH.WALKER@sakilacustomer.org,672,DEBORAH,WALKER,2006-02-15 04:57:12,20050730,672,1,,2024-12-08T03:05:33.793Z,dbfs:/FileStore/retail/stream/fact_table/fact_table03.json
2.99,20060214,25,DEBORAH.WALKER@sakilacustomer.org,673,DEBORAH,WALKER,2006-02-15 04:57:12,20050731,673,1,,2024-12-08T03:05:33.793Z,dbfs:/FileStore/retail/stream/fact_table/fact_table03.json
2.99,20060214,25,DEBORAH.WALKER@sakilacustomer.org,674,DEBORAH,WALKER,2006-02-15 04:57:12,20050731,674,1,,2024-12-08T03:05:33.793Z,dbfs:/FileStore/retail/stream/fact_table/fact_table03.json
5.99,20060214,25,DEBORAH.WALKER@sakilacustomer.org,675,DEBORAH,WALKER,2006-02-15 04:57:12,20050801,675,1,,2024-12-08T03:05:33.793Z,dbfs:/FileStore/retail/stream/fact_table/fact_table03.json
2.99,20060214,25,DEBORAH.WALKER@sakilacustomer.org,676,DEBORAH,WALKER,2006-02-15 04:57:12,20050802,676,1,,2024-12-08T03:05:33.793Z,dbfs:/FileStore/retail/stream/fact_table/fact_table03.json


In [0]:
(spark.table("fact_table_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{fact_table_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_table_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fb5c4b07e50>

In [0]:
##### Silver Table: Including Reference Data

In [0]:
(spark.readStream
  .table("fact_table_bronze")
  .createOrReplaceTempView("fact_table_silver_tempview"))

In [0]:
%sql
SELECT * FROM fact_table_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED fact_table_silver_tempview

col_name,data_type,comment
amount,double,
create_date_key,bigint,
customer_key,bigint,
email,string,
fact_table_key,bigint,
first_name,string,
last_name,string,
last_update,string,
payment_date_key,bigint,
payment_key,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW facts_table_silver_tempview AS (
  SELECT f.fact_table_key,
  f.customer_key,
  f.payment_key,
  f.store_key,
  f.first_name,
  f.last_name,
  f.email,
  f.amount,
  f.last_update as fact_last_update,
  f.create_date_key,
  f.payment_date_key,
  r.rental_key,
  r.rental_id,
  r.rental_date,
  r.return_date,
  c.customer_id,
  c.store_id,
  c.create_date,
  p.payment_id,
  p.staff_id,
  p.payment_date,
  cd.day_name_of_week as create_date_day_name_of_week,
  cd.day_of_month as create_date_day_of_month,
  cd.weekday_weekend as create_date_weekday_weekend,
  cd.month_name as create_date_month_name,
  cd.calendar_quarter as create_date_calendar_quarter,
  cd.calendar_year as create_date_calendar_year,
  pd.day_name_of_week as paid_date_day_name_of_week,
  pd.day_of_month as paid_date_day_of_month,
  pd.weekday_weekend as paid_date_weekday_weekend,
  pd.month_name as paid_date_month_name,
  pd.calendar_quarter as paid_date_calendar_quarter,
  pd.calendar_year as paid_date_calendar_year
  FROM fact_table_silver_tempview AS f
  INNER JOIN sakila_dlh.dim_rentals AS r
  ON r.customer_id = f.customer_key
  INNER JOIN sakila_dlh.dim_customers AS c
  ON c.customer_key = f.customer_key
  INNER JOIN sakila_dlh.dim_payments AS p
  ON p.payment_key = f.payment_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS cd
  ON cd.date_key = f.create_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS pd
  ON pd.date_key = f.payment_date_key
)

In [0]:
(spark.table("facts_table_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{fact_table_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_table_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fb5f172e710>

In [0]:
%sql
SELECT * FROM fact_table_silver

fact_table_key,customer_key,payment_key,store_key,first_name,last_name,email,amount,fact_last_update,create_date_key,payment_date_key,rental_key,rental_id,rental_date,return_date,customer_id,store_id,create_date,payment_id,staff_id,payment_date,create_date_day_name_of_week,create_date_day_of_month,create_date_weekday_weekend,create_date_month_name,create_date_calendar_quarter,create_date_calendar_year,paid_date_day_name_of_week,paid_date_day_of_month,paid_date_weekday_weekend,paid_date_month_name,paid_date_calendar_quarter,paid_date_calendar_year
512,19,512,1,RUTH,MARTINEZ,RUTH.MARTINEZ@sakilacustomer.org,0.99,2006-02-15 04:57:12,20060214,20050823,18,18,2005-05-25T01:10:47Z,2005-05-31T06:35:47Z,19,1,2006-02-14 22:04:36,513,2,2005-08-23 03:46:47,Tuesday,14,Weekday,February,1,2006,Tuesday,23,Weekday,August,3,2005
206,7,206,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,5.99,2006-02-15 04:57:12,20060214,20050821,46,46,2005-05-25T06:04:08Z,2005-06-02T08:18:08Z,7,1,2006-02-14 22:04:36,206,1,2005-08-21 04:49:48,Tuesday,14,Weekday,February,1,2006,Sunday,21,Weekend,August,3,2005
982,35,982,2,VIRGINIA,GREEN,VIRGINIA.GREEN@sakilacustomer.org,4.99,2006-02-15 04:57:12,20060214,20050821,47,47,2005-05-25T06:05:20Z,2005-05-30T03:04:20Z,35,2,2006-02-14 22:04:36,983,2,2005-08-21 22:25:09,Tuesday,14,Weekday,February,1,2006,Sunday,21,Weekend,August,3,2005
488,18,488,2,CAROL,GARCIA,CAROL.GARCIA@sakilacustomer.org,0.99,2006-02-15 04:57:12,20060214,20050820,50,50,2005-05-25T06:44:53Z,2005-05-28T11:28:53Z,18,2,2006-02-14 22:04:36,489,2,2005-08-20 01:29:29,Tuesday,14,Weekday,February,1,2006,Saturday,20,Weekend,August,3,2005
173,6,173,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,0.99,2006-02-15 04:57:12,20060214,20050823,57,57,2005-05-25T08:43:32Z,2005-05-29T06:42:32Z,6,2,2006-02-14 22:04:36,173,2,2005-08-23 06:41:32,Tuesday,14,Weekday,February,1,2006,Tuesday,23,Weekday,August,3,2005
32,1,32,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5.99,2006-02-15 04:57:12,20060214,20050822,76,76,2005-05-25T11:30:37Z,2005-06-03T12:00:37Z,1,1,2006-02-14 22:04:36,32,1,2005-08-22 20:03:46,Tuesday,14,Weekday,February,1,2006,Monday,22,Weekday,August,3,2005
666,25,666,1,DEBORAH,WALKER,DEBORAH.WALKER@sakilacustomer.org,2.99,2006-02-15 04:57:12,20060214,20050710,90,90,2005-05-25T14:31:25Z,2005-06-01T10:07:25Z,25,1,2006-02-14 22:04:36,667,1,2005-07-10 19:19:43,Tuesday,14,Weekday,February,1,2006,Sunday,10,Weekend,July,3,2005
512,19,512,1,RUTH,MARTINEZ,RUTH.MARTINEZ@sakilacustomer.org,0.99,2006-02-15 04:57:12,20060214,20050823,110,110,2005-05-25T18:43:49Z,2005-06-03T18:13:49Z,19,1,2006-02-14 22:04:36,513,2,2005-08-23 03:46:47,Tuesday,14,Weekday,February,1,2006,Tuesday,23,Weekday,August,3,2005
488,18,488,2,CAROL,GARCIA,CAROL.GARCIA@sakilacustomer.org,0.99,2006-02-15 04:57:12,20060214,20050820,116,116,2005-05-25T19:27:51Z,2005-05-26T16:23:51Z,18,2,2006-02-14 22:04:36,489,2,2005-08-20 01:29:29,Tuesday,14,Weekday,February,1,2006,Saturday,20,Weekend,August,3,2005
206,7,206,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,5.99,2006-02-15 04:57:12,20060214,20050821,117,117,2005-05-25T19:30:46Z,2005-05-31T23:59:46Z,7,1,2006-02-14 22:04:36,206,1,2005-08-21 04:49:48,Tuesday,14,Weekday,February,1,2006,Sunday,21,Weekend,August,3,2005


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_table_silver

col_name,data_type,comment
fact_table_key,bigint,
customer_key,bigint,
payment_key,bigint,
store_key,bigint,
first_name,string,
last_name,string,
email,string,
amount,double,
fact_last_update,string,
create_date_key,bigint,


In [0]:
#### Gold Table: Performning Aggregations
#### Creating a new Gold table using the CTAS approach. 

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_customer_name_total_spent_gold AS (
  SELECT customer_key AS CustomerID
    , last_name AS LastName
    , first_name AS FirstName
    , SUM(amount) AS TotalSpent
  FROM sakila_dlh.fact_table_silver
  GROUP BY CustomerID, LastName, FirstName
  ORDER BY TotalSpent DESC);

SELECT * FROM sakila_dlh.fact_customer_name_total_spent_gold;

CustomerID,LastName,FirstName,TotalSpent
19,MARTINEZ,RUTH,754.560000000001
7,MILLER,MARIA,606.6800000000009
14,WHITE,BETTY,588.6000000000008
21,CLARK,MICHELLE,466.9500000000007
16,MARTIN,SANDRA,356.1600000000005
20,ROBINSON,SHARON,347.1000000000004
22,RODRIGUEZ,LAURA,341.3400000000004
17,THOMPSON,DONNA,296.37000000000023
6,DAVIS,JENNIFER,281.1600000000004
18,GARCIA,CAROL,275.34000000000015


In [0]:
#### Clean up the File System

In [0]:
%fs rm -r /FileStore/