DS 2002: Data Project 2 (Course Capstone) - Michelle Kim

Importing necessary libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

Instantiating global variables 

In [0]:

# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ds2002-mysql-tmf8dy.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "tmf8dy",
  "password" : "Familyandsammy28??",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.bs8vmg2"
atlas_database_name = "sakila_inventory"
atlas_user_name = "tmf8dy"
atlas_password = "Familyandsammy28"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

inventory_rentals_stream_dir = f"{stream_dir}/inventory_transactions"

inventory_rentals_output_bronze = f"{database_dir}/fact_inventory_rentals/bronze"
inventory_rentals_output_silver = f"{database_dir}/fact_inventory_rentals/silver"
inventory_rentals_output_gold   = f"{database_dir}/fact_inventory_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_inventory_rentals", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

Defining global functions 

In [0]:
###################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

Fetching reference data from an Azure MySQL Database and creating a new Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

Creating a new table that sources date dimension data from a table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-tmf8dy.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "dim_date",
  user "tmf8dy",    --Replace with your User Name
  password "Familyandsammy28??"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


Creating a new table that sources film (product) dimension data from my Azure MySQL database

In [0]:
%sql
-- Create a Temporary View named "view_film" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_film
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-tmf8dy.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "dim_film",
  user "tmf8dy",    --Replace with your User Name
  password "Familyandsammy28??"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_film" using data from the view named "view_film"
CREATE OR REPLACE TABLE sakila_dlh.dim_film
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/dim_film"
AS SELECT * FROM view_film

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film;

col_name,data_type,comment
film_key,bigint,
film_id,bigint,
title,varchar(65535),
description,varchar(65535),
release_year,varchar(65535),
language_id,bigint,
original_language_id,bigint,
rental_duration,bigint,
rental_rate,double,
length,bigint,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5

film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42
3,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,0,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15 00:03:42
4,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,0,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15 00:03:42
5,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,0,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15 00:03:42


Fetching reference data from a MongoDB Atlas Database and viewing the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data/retail/batch'

path,name,size,modificationTime
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimCustomers.json,Northwind_DimCustomers.json,10884,1715094303000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimEmployees.csv,Northwind_DimEmployees.csv,2174,1715094302000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimInvoices.json,Northwind_DimInvoices.json,6580,1715094302000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimShippers.csv,Northwind_DimShippers.csv,266,1715094303000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimSuppliers.json,Northwind_DimSuppliers.json,1552,1715094302000
dbfs:/FileStore/lab_data/retail/batch/sakila_dim_customer.json,sakila_dim_customer.json,148680,1715094365000
dbfs:/FileStore/lab_data/retail/batch/sakila_dim_film.json,sakila_dim_film.json,422420,1715094365000
dbfs:/FileStore/lab_data/retail/batch/sakila_dim_payment.csv,sakila_dim_payment.csv,926587,1715288854000
dbfs:/FileStore/lab_data/retail/batch/sakila_dim_payment2.csv,sakila_dim_payment2.csv,942632,1715094365000
dbfs:/FileStore/lab_data/retail/batch/sakila_dim_store.json,sakila_dim_store.json,213,1715094365000


Creating a New MongoDB Database and loading the JSON data into a new MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/lab_data/retail/batch'
json_files = {"customer" : 'sakila_dim_customer.json'
              , "film" : 'sakila_dim_film.json'
              , "store" : 'sakila_dim_store.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f7287e777c0>

Fetching Customer Dimension Data from the new MongoDB Collection

In [0]:
%scala

val userName = "tmf8dy"
val pwd = "Familyandsammy28"
val clusterName = "cluster0.bs8vmg2"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_inventory")
.option("collection", "customer").load()
.select("customer_key","store_id","last_name","first_name","email","active","create_date","last_update")

display(df_customer)

customer_key,store_id,last_name,first_name,email,active,create_date,last_update
1,1,SMITH,MARY,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20
2,1,JOHNSON,PATRICIA,PATRICIA.JOHNSON@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20
3,1,WILLIAMS,LINDA,LINDA.WILLIAMS@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20
4,2,JONES,BARBARA,BARBARA.JONES@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20
5,1,BROWN,ELIZABETH,ELIZABETH.BROWN@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20
6,2,DAVIS,JENNIFER,JENNIFER.DAVIS@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20
7,1,MILLER,MARIA,MARIA.MILLER@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20
8,2,WILSON,SUSAN,SUSAN.WILSON@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20
9,2,MOORE,MARGARET,MARGARET.MOORE@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20
10,1,TAYLOR,DOROTHY,DOROTHY.TAYLOR@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20


In [0]:
%scala
df_customer.printSchema()

Using the Spark DataFrame to create a new Customer dimension table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
store_id,int,
last_name,string,
first_name,string,
email,string,
active,int,
create_date,string,
last_update,string,
,,
# Delta Statistics Columns,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,store_id,last_name,first_name,email,active,create_date,last_update
1,1,SMITH,MARY,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20
2,1,JOHNSON,PATRICIA,PATRICIA.JOHNSON@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20
3,1,WILLIAMS,LINDA,LINDA.WILLIAMS@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20
4,2,JONES,BARBARA,BARBARA.JONES@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20
5,1,BROWN,ELIZABETH,ELIZABETH.BROWN@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-14 23:57:20


Fetching Store Dimension Data from the new MongoDB Collection

In [0]:
%scala
val df_store = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_inventory")
.option("collection", "store").load()
.select("store_key","manager_staff_id","address_id","last_update")

In [0]:
%scala
df_store.printSchema()

Using the Spark DataFrame to create a new Store Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_store.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_store")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_store

col_name,data_type,comment
store_key,int,
manager_staff_id,int,
address_id,int,
last_update,string,
,,
# Delta Statistics Columns,,
Column Names,"store_key, manager_staff_id, address_id, last_update",
Column Selection Method,first-32,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_store LIMIT 5

store_key,manager_staff_id,address_id,last_update
1,1,1,2006-02-14 23:57:12
2,2,2,2006-02-14 23:57:12


Fetching data from a file system and using PySpark to read from a CSV file

In [0]:
payment_csv = f"{batch_dir}/sakila_dim_payment.csv"

# Specify the delimiter as semicolon when reading the CSV
df_payment = spark.read.format("csv").option("header", "true").option("delimiter", ";").load(payment_csv)
display(df_payment)

payment_key,payment_id,customer_id,staff_id,rental_id,amount,last_update,payment_date_key
1,1,1,1,76,2.99,2006-02-15 17:12:30,20050525
2,2,1,1,573,0.99,2006-02-15 17:12:30,20050528
3,3,1,1,1185,5.99,2006-02-15 17:12:30,20050615
4,4,1,2,1422,0.99,2006-02-15 17:12:30,20050615
5,5,1,2,1476,9.99,2006-02-15 17:12:30,20050615
6,6,1,1,1725,4.99,2006-02-15 17:12:30,20050616
7,7,1,1,2308,4.99,2006-02-15 17:12:30,20050618
8,8,1,2,2363,0.99,2006-02-15 17:12:30,20050618
9,9,1,1,3284,3.99,2006-02-15 17:12:30,20050621
10,10,1,2,4526,5.99,2006-02-15 17:12:30,20050708


In [0]:
# Specify the delimiter as semicolon when reading the CSV
df_payment = spark.read.format("csv").option("header", "true").option("delimiter", ";").load(payment_csv)
df_payment.printSchema()

root
 |-- payment_key: string (nullable = true)
 |-- payment_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- staff_id: string (nullable = true)
 |-- rental_id: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- last_update: string (nullable = true)
 |-- payment_date_key: string (nullable = true)



In [0]:
df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_payment")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_payment;

col_name,data_type,comment
payment_key,string,
payment_id,string,
customer_id,string,
staff_id,string,
rental_id,string,
amount,string,
last_update,string,
payment_date_key,string,
,,
# Delta Statistics Columns,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_payment LIMIT 5;

payment_key,payment_id,customer_id,staff_id,rental_id,amount,last_update,payment_date_key
1,1,1,1,76,2.99,2006-02-15 17:12:30,20050525
2,2,1,1,573,0.99,2006-02-15 17:12:30,20050528
3,3,1,1,1185,5.99,2006-02-15 17:12:30,20050615
4,4,1,2,1422,0.99,2006-02-15 17:12:30,20050615
5,5,1,2,1476,9.99,2006-02-15 17:12:30,20050615


Verifying Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
sakila_dlh,dim_payment,False
sakila_dlh,dim_store,False
,view_date,True
,view_film,True


Integrating Reference Data with real-time data, using autoLoader to process Streaming (Hot Path) Inventory Rents Fact Data, Creating Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", inventory_rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(inventory_rentals_stream_dir)
 .createOrReplaceTempView("inventory_rentals_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW inventory_rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM inventory_rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM inventory_rentals_bronze_tempview 

fact_inventory_rental_key,fact_inventory_transaction_key,film_key,inventory_date_key,inventory_id,inventory_transaction_key,inventory_transaction_type,last_update,length,order_key,product_key,purchase_order_key,quantity,rental_duration,rental_rate,replacement_cost,store_id,title,transaction_created_date_key,transaction_modified_date_key,_rescued_data,receipt_time,source_file
3055,,670,20060215,3055,,,2006-02-15 00:03:42,48,,,,,4,4.99,17.99,2,PELICAN COMFORTS,,,,2024-05-10T02:06:43.474Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3056,,672,20060215,3056,,,2006-02-15 00:03:42,82,,,,,7,2.99,17.99,1,PERFECT GROOVE,,,,2024-05-10T02:06:43.474Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3057,,672,20060215,3057,,,2006-02-15 00:03:42,82,,,,,7,2.99,17.99,1,PERFECT GROOVE,,,,2024-05-10T02:06:43.474Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3058,,672,20060215,3058,,,2006-02-15 00:03:42,82,,,,,7,2.99,17.99,2,PERFECT GROOVE,,,,2024-05-10T02:06:43.474Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3059,,672,20060215,3059,,,2006-02-15 00:03:42,82,,,,,7,2.99,17.99,2,PERFECT GROOVE,,,,2024-05-10T02:06:43.474Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3060,,672,20060215,3060,,,2006-02-15 00:03:42,82,,,,,7,2.99,17.99,2,PERFECT GROOVE,,,,2024-05-10T02:06:43.474Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3061,,672,20060215,3061,,,2006-02-15 00:03:42,82,,,,,7,2.99,17.99,2,PERFECT GROOVE,,,,2024-05-10T02:06:43.474Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3062,,673,20060215,3062,,,2006-02-15 00:03:42,118,,,,,3,0.99,19.99,1,PERSONAL LADYBUGS,,,,2024-05-10T02:06:43.474Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3063,,673,20060215,3063,,,2006-02-15 00:03:42,118,,,,,3,0.99,19.99,1,PERSONAL LADYBUGS,,,,2024-05-10T02:06:43.474Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3064,,673,20060215,3064,,,2006-02-15 00:03:42,118,,,,,3,0.99,19.99,2,PERSONAL LADYBUGS,,,,2024-05-10T02:06:43.474Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json


In [0]:
(spark.table("inventory_rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{inventory_rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_inventory_rentals_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f72844702d0>

Creating Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_inventory_rentals_bronze")
  .createOrReplaceTempView("inventory_rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM inventory_rentals_silver_tempview

fact_inventory_rental_key,fact_inventory_transaction_key,film_key,inventory_date_key,inventory_id,inventory_transaction_key,inventory_transaction_type,last_update,length,order_key,product_key,purchase_order_key,quantity,rental_duration,rental_rate,replacement_cost,store_id,title,transaction_created_date_key,transaction_modified_date_key,_rescued_data,receipt_time,source_file
3055,,670,20060215,3055,,,2006-02-15 00:03:42,48,,,,,4,4.99,17.99,2,PELICAN COMFORTS,,,,2024-05-10T02:06:34.919Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3056,,672,20060215,3056,,,2006-02-15 00:03:42,82,,,,,7,2.99,17.99,1,PERFECT GROOVE,,,,2024-05-10T02:06:34.919Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3057,,672,20060215,3057,,,2006-02-15 00:03:42,82,,,,,7,2.99,17.99,1,PERFECT GROOVE,,,,2024-05-10T02:06:34.919Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3058,,672,20060215,3058,,,2006-02-15 00:03:42,82,,,,,7,2.99,17.99,2,PERFECT GROOVE,,,,2024-05-10T02:06:34.919Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3059,,672,20060215,3059,,,2006-02-15 00:03:42,82,,,,,7,2.99,17.99,2,PERFECT GROOVE,,,,2024-05-10T02:06:34.919Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3060,,672,20060215,3060,,,2006-02-15 00:03:42,82,,,,,7,2.99,17.99,2,PERFECT GROOVE,,,,2024-05-10T02:06:34.919Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3061,,672,20060215,3061,,,2006-02-15 00:03:42,82,,,,,7,2.99,17.99,2,PERFECT GROOVE,,,,2024-05-10T02:06:34.919Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3062,,673,20060215,3062,,,2006-02-15 00:03:42,118,,,,,3,0.99,19.99,1,PERSONAL LADYBUGS,,,,2024-05-10T02:06:34.919Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3063,,673,20060215,3063,,,2006-02-15 00:03:42,118,,,,,3,0.99,19.99,1,PERSONAL LADYBUGS,,,,2024-05-10T02:06:34.919Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json
3064,,673,20060215,3064,,,2006-02-15 00:03:42,118,,,,,3,0.99,19.99,2,PERSONAL LADYBUGS,,,,2024-05-10T02:06:34.919Z,dbfs:/FileStore/lab_data/retail/stream/inventory_transactions/sakila_fact_inventory_rentals_03.json


In [0]:
%sql
DESCRIBE EXTENDED inventory_rentals_silver_tempview

col_name,data_type,comment
fact_inventory_rental_key,bigint,
fact_inventory_transaction_key,bigint,
film_key,bigint,
inventory_date_key,bigint,
inventory_id,bigint,
inventory_transaction_key,bigint,
inventory_transaction_type,string,
last_update,string,
length,bigint,
order_key,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_inventory_rentals_silver_tempview AS (
    SELECT 
        ir.fact_inventory_rental_key,
        ir.inventory_id,
        f.title AS film_title,
        f.rental_rate,
        rd.full_date AS rental_date,
        rd.calendar_quarter AS rental_calendar_quarter,
        rd.fiscal_quarter AS rental_fiscal_quarter,
        rd.calendar_quarter AS return_calendar_quarter,
        rd.fiscal_quarter AS return_fiscal_quarter,
        s.store_key,
        p.amount AS payment_amount,
        p.payment_id
    FROM inventory_rentals_silver_tempview AS ir
    INNER JOIN sakila_dlh.dim_film AS f
        ON f.film_key = ir.film_key
    INNER JOIN sakila_dlh.dim_store AS s
        ON s.store_key = ir.store_id
    INNER JOIN sakila_dlh.dim_payment AS p
        ON p.payment_id = ir.purchase_order_key
    INNER JOIN sakila_dlh.dim_date AS rd  -- Rental date
        ON rd.date_key = ir.inventory_date_key
)


In [0]:
(spark.table("fact_inventory_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{inventory_rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_inventory_rentals_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f72843e0150>

In [0]:
%sql
SELECT * FROM fact_inventory_rentals_silver

fact_inventory_rental_key,inventory_id,film_title,rental_rate,rental_date,rental_calendar_quarter,rental_fiscal_quarter,return_calendar_quarter,return_fiscal_quarter,store_key,payment_amount,payment_id


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_inventory_rentals_silver

col_name,data_type,comment
fact_inventory_rental_key,bigint,
inventory_id,bigint,
film_title,varchar(65535),
rental_rate,double,
rental_date,date,
rental_calendar_quarter,tinyint,
rental_fiscal_quarter,tinyint,
return_calendar_quarter,tinyint,
return_fiscal_quarter,tinyint,
store_key,int,


Creating gold table: performing aggregations and using the CTAS approach

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_rentals_by_store_gold AS (
  SELECT store_id AS StoreID
    , title AS MovieTitle
    , rental_duration * rental_rate AS TotalCost
  FROM sakila_dlh.fact_inventory_rentals_bronze
  GROUP BY StoreID, MovieTitle, TotalCost
  ORDER BY TotalCost DESC);

SELECT * FROM sakila_dlh.fact_rentals_by_store_gold;

StoreID,MovieTitle,TotalCost
1.0,STEPMOM DREAM,34.93
1.0,WASH HEAVENLY,34.93
2.0,GRINCH MASSAGE,34.93
1.0,PEAK FOREVER,34.93
1.0,WANDA CHAMBER,34.93
1.0,SLEEPING SUSPECTS,34.93
2.0,SLEEPING SUSPECTS,34.93
1.0,WAR NOTTING,34.93
2.0,JET NEIGHBORS,34.93
2.0,LEGALLY SECRETARY,34.93


Cleaning up the file system

In [0]:
%fs rm -r /FileStore/lab_data/