## Lab 06: Data Lakehouse with Structured Streaming
This lab will help you learn to use many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-session capstone project for course **DS-2002: Data Systems**. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ssa4ec-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "northwind_dw2"

connection_properties = {
  "user" : "ssa4ec",
  "password" : "Passw0rd123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.lsmgeeb"
atlas_database_name = "northwind_dw"
atlas_user_name = "ssa4ec"
atlas_password = "Passw0rd123"

mongo_uri = f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net/{atlas_database_name}"
print(mongo_uri)

# Data Files (JSON) Information ###############################
dst_database = "northwind_dlh"

base_dir = "dbfs:/FileStore/ds2002-lab06"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"
purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

mongodb+srv://ssa4ec:Passw0rd123@cluster0.lsmgeeb.mongodb.net/northwind_dw
Out[30]: True

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS northwind_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS northwind_dlh
COMMENT "DS-2002 Lab 06 Database"
LOCATION "dbfs:/FileStore/ds2002-lab06/northwind_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Lab 6.0");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ssa4ec-mysql.mysql.database.azure.com:3306/northwind_dw",
  dbtable "dim_date",
  user "sabdulali",
  password "Passw0rd123"
)


In [0]:
%sql
USE DATABASE northwind_dlh;

CREATE OR REPLACE TABLE northwind_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/northwind_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM northwind_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ssa4ec-mysql.mysql.database.azure.com:3306/northwind_dw",
  dbtable "dim_products",
  user "sabdulali",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE northwind_dlh;
-- Create a new table named "northwind_dlh.dim_product" using data from the view named "view_product"

CREATE OR REPLACE TABLE northwind_dlh.dim_product
COMMENT "Product Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/northwind_dlh/dim_product"
AS SELECT * FROM view_product
-- Create a new table named "northwind_dlh.dim_product" using data from the view named "view_product"

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_product;

col_name,data_type,comment
product_key,bigint,
product_code,string,
product_name,string,
standard_cost,double,
list_price,double,
reorder_level,bigint,
target_level,bigint,
quantity_per_unit,string,
discontinued,bigint,
minimum_reorder_quantity,double,


In [0]:
%sql
SELECT * FROM northwind_dlh.dim_product LIMIT 5

product_key,product_code,product_name,standard_cost,list_price,reorder_level,target_level,quantity_per_unit,discontinued,minimum_reorder_quantity,category
1,NWTB-1,Northwind Traders Chai,13.5,18.0,10,40,10 boxes x 20 bags,0,10.0,Beverages
3,NWTCO-3,Northwind Traders Syrup,7.5,10.0,25,100,12 - 550 ml bottles,0,25.0,Condiments
4,NWTCO-4,Northwind Traders Cajun Seasoning,16.5,22.0,10,40,48 - 6 oz jars,0,10.0,Condiments
5,NWTO-5,Northwind Traders Olive Oil,16.0125,21.35,10,40,36 boxes,0,10.0,Oil
6,NWTJP-6,Northwind Traders Boysenberry Spread,18.75,25.0,25,100,12 - 8 oz jars,0,25.0,"Jams, Preserves"


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-lab06/source_data/batch/Northwind_DimCustomers.json,Northwind_DimCustomers.json,10476,1682611167000
dbfs:/FileStore/ds2002-lab06/source_data/batch/Northwind_DimEmployees.csv,Northwind_DimEmployees.csv,2164,1682611167000
dbfs:/FileStore/ds2002-lab06/source_data/batch/Northwind_DimInvoices.json,Northwind_DimInvoices.json,6263,1682611167000
dbfs:/FileStore/ds2002-lab06/source_data/batch/Northwind_DimShippers.csv,Northwind_DimShippers.csv,262,1682611167000
dbfs:/FileStore/ds2002-lab06/source_data/batch/Northwind_DimSuppliers.json,Northwind_DimSuppliers.json,1480,1682611168000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds2002-lab06/source_data/batch'
json_files = {"customers" : 'Northwind_DimCustomers.json', "suppliers" : 'Northwind_DimSuppliers.json', "invoices" : 'Northwind_DimInvoices.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[43]: <pymongo.results.InsertManyResult at 0x7fc6e5c47300>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "northwind_dw").option("collection", "customers").load()
.select("customer_key","company","last_name","first_name","job_title","business_phone","fax_number","address","city","state_province","zip_postal_code","country_region")

display(df_customer)

customer_key,company,last_name,first_name,job_title,business_phone,fax_number,address,city,state_province,zip_postal_code,country_region
1,Company A,Bedecs,Anna,Owner,(123)555-0100,(123)555-0101,123 1st Street,Seattle,WA,99999,USA
2,Company B,Gratacos Solsona,Antonio,Owner,(123)555-0100,(123)555-0101,123 2nd Street,Boston,MA,99999,USA
3,Company C,Axen,Thomas,Purchasing Representative,(123)555-0100,(123)555-0101,123 3rd Street,Los Angelas,CA,99999,USA
4,Company D,Lee,Christina,Purchasing Manager,(123)555-0100,(123)555-0101,123 4th Street,New York,NY,99999,USA
5,Company E,O’Donnell,Martin,Owner,(123)555-0100,(123)555-0101,123 5th Street,Minneapolis,MN,99999,USA
6,Company F,Pérez-Olaeta,Francisco,Purchasing Manager,(123)555-0100,(123)555-0101,123 6th Street,Milwaukee,WI,99999,USA
7,Company G,Xie,Ming-Yang,Owner,(123)555-0100,(123)555-0101,123 7th Street,Boise,ID,99999,USA
8,Company H,Andersen,Elizabeth,Purchasing Representative,(123)555-0100,(123)555-0101,123 8th Street,Portland,OR,99999,USA
9,Company I,Mortensen,Sven,Purchasing Manager,(123)555-0100,(123)555-0101,123 9th Street,Salt Lake City,UT,99999,USA
10,Company J,Wacker,Roland,Purchasing Manager,(123)555-0100,(123)555-0101,123 10th Street,Chicago,IL,99999,USA


In [0]:
%scala
df_customer.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
company,string,
last_name,string,
first_name,string,
job_title,string,
business_phone,string,
fax_number,string,
address,string,
city,string,
state_province,string,


In [0]:
%sql
SELECT * FROM northwind_dlh.dim_customer LIMIT 5

customer_key,company,last_name,first_name,job_title,business_phone,fax_number,address,city,state_province,zip_postal_code,country_region
1,Company A,Bedecs,Anna,Owner,(123)555-0100,(123)555-0101,123 1st Street,Seattle,WA,99999,USA
2,Company B,Gratacos Solsona,Antonio,Owner,(123)555-0100,(123)555-0101,123 2nd Street,Boston,MA,99999,USA
3,Company C,Axen,Thomas,Purchasing Representative,(123)555-0100,(123)555-0101,123 3rd Street,Los Angelas,CA,99999,USA
4,Company D,Lee,Christina,Purchasing Manager,(123)555-0100,(123)555-0101,123 4th Street,New York,NY,99999,USA
5,Company E,O’Donnell,Martin,Owner,(123)555-0100,(123)555-0101,123 5th Street,Minneapolis,MN,99999,USA


##### 2.4.1 Fetch Supplier Dimension Data from the New MongoDB Collection

In [0]:
%scala
val df_supplier = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "northwind_dw").option("collection", "suppliers").load()

display(df_supplier)

_id,company,first_name,job_title,last_name,supplier_key
List(644aaf5856695574aee0fa52),Supplier A,Elizabeth A.,Sales Manager,Andersen,1
List(644aaf5856695574aee0fa53),Supplier B,Cornelia,Sales Manager,Weiler,2
List(644aaf5856695574aee0fa54),Supplier C,Madeleine,Sales Representative,Kelley,3
List(644aaf5856695574aee0fa55),Supplier D,Naoki,Marketing Manager,Sato,4
List(644aaf5856695574aee0fa56),Supplier E,Amaya,Sales Manager,Hernandez-Echevarria,5
List(644aaf5856695574aee0fa57),Supplier F,Satomi,Marketing Assistant,Hayakawa,6
List(644aaf5856695574aee0fa58),Supplier G,Stuart,Marketing Manager,Glasson,7
List(644aaf5856695574aee0fa59),Supplier H,Bryn Paul,Sales Representative,Dunton,8
List(644aaf5856695574aee0fa5a),Supplier I,Mikael,Sales Manager,Sandberg,9
List(644aaf5856695574aee0fa5b),Supplier J,Luis,Sales Manager,Sousa,10


In [0]:
%scala
df_supplier.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_supplier")


##### 2.4.2. Use the Spark DataFrame to Create a New Suppliers Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_supplier.printSchema()

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_supplier

col_name,data_type,comment
_id,struct,
company,string,
first_name,string,
job_title,string,
last_name,string,
supplier_key,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,northwind_dlh,


In [0]:
%sql
SELECT * FROM northwind_dlh.dim_supplier LIMIT 5

_id,company,first_name,job_title,last_name,supplier_key
List(644aaf5856695574aee0fa52),Supplier A,Elizabeth A.,Sales Manager,Andersen,1
List(644aaf5856695574aee0fa53),Supplier B,Cornelia,Sales Manager,Weiler,2
List(644aaf5856695574aee0fa54),Supplier C,Madeleine,Sales Representative,Kelley,3
List(644aaf5856695574aee0fa55),Supplier D,Naoki,Marketing Manager,Sato,4
List(644aaf5856695574aee0fa56),Supplier E,Amaya,Sales Manager,Hernandez-Echevarria,5


##### 2.5.1 Fetch Invoice Dimension Data from teh New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_invoice = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "northwind_dw").option("collection", "invoices").option("uri", "mongodb+srv://ssa4ec:Passw0rd123@cluster0.lsmgeeb.mongodb.net/").load()
.select("invoice_key","order_key","invoice_date","due_date","tax","shipping","amount_due")

display(df_invoice)

invoice_key,order_key,invoice_date,due_date,tax,shipping,amount_due
5,31,2006-03-22 16:08:59,2006-04-04 11:43:08,0,0,0
6,32,2006-03-22 16:10:27,2006-04-04 11:43:08,0,0,0
7,40,2006-03-24 10:41:41,2006-04-04 11:43:08,0,0,0
8,39,2006-03-24 10:55:46,2006-04-04 11:43:08,0,0,0
9,38,2006-03-24 10:56:57,2006-04-04 11:43:08,0,0,0
10,37,2006-03-24 10:57:38,2006-04-04 11:43:08,0,0,0
11,36,2006-03-24 10:58:40,2006-04-04 11:43:08,0,0,0
12,35,2006-03-24 10:59:41,2006-04-04 11:43:08,0,0,0
13,34,2006-03-24 11:00:55,2006-04-04 11:43:08,0,0,0
14,33,2006-03-24 11:02:02,2006-04-04 11:43:08,0,0,0


In [0]:
%scala
df_invoice.printSchema()

##### 2.5.2. Use the Spark DataFrame to Create a New Invoices Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_invoice.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_invoice")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_invoice

col_name,data_type,comment
invoice_key,int,
order_key,int,
invoice_date,string,
due_date,string,
tax,int,
shipping,int,
amount_due,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM northwind_dlh.dim_invoice LIMIT 5

invoice_key,order_key,invoice_date,due_date,tax,shipping,amount_due
5,31,2006-03-22 16:08:59,2006-04-04 11:43:08,0,0,0
6,32,2006-03-22 16:10:27,2006-04-04 11:43:08,0,0,0
7,40,2006-03-24 10:41:41,2006-04-04 11:43:08,0,0,0
8,39,2006-03-24 10:55:46,2006-04-04 11:43:08,0,0,0
9,38,2006-03-24 10:56:57,2006-04-04 11:43:08,0,0,0


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
employee_csv = f"{batch_dir}/Northwind_DimEmployees.csv"

df_employee = spark.read.format('csv').options(header='true', inferSchema='true').load(employee_csv)
display(df_employee)

employee_key,company,last_name,first_name,email_address,job_title,business_phone,home_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page
1,Northwind Traders,Freehafer,Nancy,nancy@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 1st Avenue,Seattle,WA,99999,USA,#http://northwindtraders.com#
2,Northwind Traders,Cencini,Andrew,andrew@northwindtraders.com,"Vice President, Sales",(123)555-0100,(123)555-0102,(123)555-0103,123 2nd Avenue,Bellevue,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
3,Northwind Traders,Kotas,Jan,jan@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 3rd Avenue,Redmond,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
4,Northwind Traders,Sergienko,Mariya,mariya@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 4th Avenue,Kirkland,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
5,Northwind Traders,Thorpe,Steven,steven@northwindtraders.com,Sales Manager,(123)555-0100,(123)555-0102,(123)555-0103,123 5th Avenue,Seattle,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
6,Northwind Traders,Neipper,Michael,michael@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 6th Avenue,Redmond,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
7,Northwind Traders,Zare,Robert,robert@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 7th Avenue,Seattle,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
8,Northwind Traders,Giussani,Laura,laura@northwindtraders.com,Sales Coordinator,(123)555-0100,(123)555-0102,(123)555-0103,123 8th Avenue,Redmond,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
9,Northwind Traders,Hellung-Larsen,Anne,anne@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 9th Avenue,Seattle,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#


In [0]:
df_employee.printSchema()

root
 |-- employee_key: integer (nullable = true)
 |-- company: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- email_address: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- business_phone: string (nullable = true)
 |-- home_phone: string (nullable = true)
 |-- fax_number: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_province: string (nullable = true)
 |-- zip_postal_code: integer (nullable = true)
 |-- country_region: string (nullable = true)
 |-- web_page: string (nullable = true)



In [0]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_employee")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_employee;

col_name,data_type,comment
employee_key,int,
company,string,
last_name,string,
first_name,string,
email_address,string,
job_title,string,
business_phone,string,
home_phone,string,
fax_number,string,
address,string,


In [0]:
%sql
SELECT * FROM northwind_dlh.dim_employee LIMIT 5;

employee_key,company,last_name,first_name,email_address,job_title,business_phone,home_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page
1,Northwind Traders,Freehafer,Nancy,nancy@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 1st Avenue,Seattle,WA,99999,USA,#http://northwindtraders.com#
2,Northwind Traders,Cencini,Andrew,andrew@northwindtraders.com,"Vice President, Sales",(123)555-0100,(123)555-0102,(123)555-0103,123 2nd Avenue,Bellevue,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
3,Northwind Traders,Kotas,Jan,jan@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 3rd Avenue,Redmond,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
4,Northwind Traders,Sergienko,Mariya,mariya@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 4th Avenue,Kirkland,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
5,Northwind Traders,Thorpe,Steven,steven@northwindtraders.com,Sales Manager,(123)555-0100,(123)555-0102,(123)555-0103,123 5th Avenue,Seattle,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#


##### 3.2 Use PySpark to Read Shipper Dimension Data from CSV File

In [0]:
shipper_csv = f"{batch_dir}/Northwind_DimShippers.csv"

df_shipper = spark.read.format('csv').options(header='true', inferSchema='true').load(shipper_csv)
display(df_shipper)

shipper_key,company,address,city,state_province,zip_postal_code,country_region
1,Shipping Company A,123 Any Street,Memphis,TN,99999,USA
2,Shipping Company B,123 Any Street,Memphis,TN,99999,USA
3,Shipping Company C,123 Any Street,Memphis,TN,99999,USA


In [0]:
df_shipper.printSchema()

root
 |-- shipper_key: integer (nullable = true)
 |-- company: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_province: string (nullable = true)
 |-- zip_postal_code: integer (nullable = true)
 |-- country_region: string (nullable = true)



In [0]:
df_shipper.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_shipper")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_shipper;

col_name,data_type,comment
shipper_key,int,
company,string,
address,string,
city,string,
state_province,string,
zip_postal_code,int,
country_region,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM northwind_dlh.dim_shipper LIMIT 5;

shipper_key,company,address,city,state_province,zip_postal_code,country_region
1,Shipping Company A,123 Any Street,Memphis,TN,99999,USA
2,Shipping Company B,123 Any Street,Memphis,TN,99999,USA
3,Shipping Company C,123 Any Street,Memphis,TN,99999,USA


##### Verify Dimension Tables

In [0]:
%sql
USE northwind_dlh;
SHOW TABLES

database,tableName,isTemporary
northwind_dlh,dim_customer,False
northwind_dlh,dim_date,False
northwind_dlh,dim_employee,False
northwind_dlh,dim_invoice,False
northwind_dlh,dim_product,False
northwind_dlh,dim_shipper,False
northwind_dlh,dim_supplier,False
,display_query_1,True
,display_query_2,True
,orders_bronze_tempview,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 .option("cloudFiles.schemaHints", "order_key BIGINT")
 .option("cloudFiles.schemaHints", "employee_key BIGINT")
 .option("cloudFiles.schemaHints", "customer_key BIGINT") 
 .option("cloudFiles.schemaHints", "product_key BIGINT")
 .option("cloudFiles.schemaHints", "shipper_key DECIMAL")
 .option("cloudFiles.schemaHints", "order_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "paid_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 .option("cloudFiles.schemaHints", "quantity DECIMAL")
 .option("cloudFiles.schemaHints", "unit_price DECIMAL")
 .option("cloudFiles.schemaHints", "discount DECIMAL")
 .option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 .option("cloudFiles.schemaHints", "taxes DECIMAL")
 .option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 .option("cloudFiles.schemaHints", "payment_type STRING")
 .option("cloudFiles.schemaHints", "order_status STRING")
 .option("cloudFiles.schemaHints", "order_details_status STRING")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

customer_key,discount,employee_key,fact_order_key,order_date_key,order_details_status,order_key,order_status,paid_date_key,payment_type,product_key,quantity,shipped_date_key,shipper_key,shipping_fee,tax_rate,taxes,unit_price,_rescued_data,receipt_time,source_file
9.0,0.0,7.0,39.0,20060605.0,Invoiced,73.0,Closed,20060605.0,Check,41.0,10.0,20060605.0,1.0,100.0,0.0,0.0,9.65,,2023-04-27T17:24:32.432+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders03.json
28.0,0.0,1.0,40.0,20060607.0,Invoiced,72.0,Closed,20060607.0,Credit Card,43.0,5.0,20060607.0,3.0,40.0,0.0,0.0,46.0,,2023-04-27T17:24:32.432+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders03.json
1.0,0.0,1.0,41.0,20060524.0,Invoiced,71.0,New,,,40.0,40.0,,3.0,0.0,0.0,0.0,18.4,,2023-04-27T17:24:32.432+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders03.json
11.0,0.0,1.0,42.0,20060524.0,Invoiced,70.0,New,,,8.0,20.0,,3.0,0.0,0.0,0.0,40.0,,2023-04-27T17:24:32.432+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders03.json
10.0,0.0,1.0,43.0,20060524.0,Invoiced,69.0,New,,,80.0,15.0,,1.0,0.0,0.0,0.0,3.5,,2023-04-27T17:24:32.432+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders03.json
10.0,0.0,4.0,44.0,20060524.0,Invoiced,67.0,Closed,20060524.0,Credit Card,74.0,20.0,20060524.0,2.0,9.0,0.0,0.0,10.0,,2023-04-27T17:24:32.432+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders03.json
8.0,0.0,6.0,45.0,20060430.0,Invoiced,60.0,Closed,20060430.0,Credit Card,72.0,40.0,20060430.0,3.0,50.0,0.0,0.0,34.8,,2023-04-27T17:24:32.432+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders03.json
3.0,0.0,4.0,46.0,20060425.0,Invoiced,63.0,Closed,20060425.0,Cash,3.0,50.0,20060425.0,2.0,7.0,0.0,0.0,10.0,,2023-04-27T17:24:32.432+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders03.json
3.0,0.0,4.0,47.0,20060425.0,Invoiced,63.0,Closed,20060425.0,Cash,8.0,3.0,20060425.0,2.0,7.0,0.0,0.0,40.0,,2023-04-27T17:24:32.432+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders03.json
4.0,0.0,3.0,48.0,20060422.0,Invoiced,58.0,Closed,20060422.0,Credit Card,20.0,40.0,20060422.0,1.0,5.0,0.0,0.0,81.0,,2023-04-27T17:24:32.432+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders03.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

Out[64]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fc6e5be7370>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

customer_key,discount,employee_key,fact_order_key,order_date_key,order_details_status,order_key,order_status,paid_date_key,payment_type,product_key,quantity,shipped_date_key,shipper_key,shipping_fee,tax_rate,taxes,unit_price,_rescued_data,receipt_time,source_file
28.0,0.0,1.0,20.0,20060407.0,Invoiced,45.0,Closed,20060407.0,Credit Card,40.0,50.0,20060407.0,3.0,40.0,0.0,0.0,18.4,,2023-04-27T17:24:57.030+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders02.json
9.0,0.0,7.0,21.0,20060405.0,Invoiced,46.0,Closed,20060405.0,Check,57.0,100.0,20060405.0,1.0,100.0,0.0,0.0,19.5,,2023-04-27T17:24:57.030+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders02.json
9.0,0.0,7.0,22.0,20060405.0,Invoiced,46.0,Closed,20060405.0,Check,72.0,50.0,20060405.0,1.0,100.0,0.0,0.0,34.8,,2023-04-27T17:24:57.030+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders02.json
6.0,0.0,6.0,23.0,20060408.0,Invoiced,47.0,Closed,20060408.0,Credit Card,34.0,300.0,20060408.0,2.0,300.0,0.0,0.0,14.0,,2023-04-27T17:24:57.030+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders02.json
8.0,0.0,4.0,24.0,20060405.0,Invoiced,48.0,Closed,20060405.0,Check,8.0,25.0,20060405.0,2.0,50.0,0.0,0.0,40.0,,2023-04-27T17:24:57.030+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders02.json
8.0,0.0,4.0,25.0,20060405.0,Invoiced,48.0,Closed,20060405.0,Check,19.0,25.0,20060405.0,2.0,50.0,0.0,0.0,9.2,,2023-04-27T17:24:57.030+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders02.json
25.0,0.0,9.0,26.0,20060405.0,Invoiced,50.0,Closed,20060405.0,Cash,21.0,20.0,20060405.0,1.0,5.0,0.0,0.0,10.0,,2023-04-27T17:24:57.030+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders02.json
26.0,0.0,9.0,27.0,20060405.0,Invoiced,51.0,Closed,20060405.0,Credit Card,5.0,25.0,20060405.0,3.0,60.0,0.0,0.0,21.35,,2023-04-27T17:24:57.030+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders02.json
26.0,0.0,9.0,28.0,20060405.0,Invoiced,51.0,Closed,20060405.0,Credit Card,41.0,30.0,20060405.0,3.0,60.0,0.0,0.0,9.65,,2023-04-27T17:24:57.030+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders02.json
26.0,0.0,9.0,29.0,20060405.0,Invoiced,51.0,Closed,20060405.0,Credit Card,40.0,30.0,20060405.0,3.0,60.0,0.0,0.0,18.4,,2023-04-27T17:24:57.030+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/orders/Northwind_FactOrders02.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
customer_key,bigint,
discount,bigint,
employee_key,bigint,
fact_order_key,bigint,
order_date_key,bigint,
order_details_status,string,
order_key,bigint,
order_status,string,
paid_date_key,bigint,
payment_type,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.fact_order_key,
      o.order_key,
      o.employee_key,
      e.last_name AS employee_last_name,
      e.first_name AS employee_first_name,
      e.job_title AS employee_job_title,
      e.company AS employee_company,
      o.customer_key,
      c.last_name AS customer_last_name,
      c.first_name AS customer_first_name,
      o.product_key,
      p.product_code,
      p.product_name,
      p.standard_cost AS product_standard_cost,
      p.list_price AS product_list_price,
      p.category AS product_category,
      o.shipper_key,
      s.company AS shipper_company,
      s.state_province AS shipper_state_province,
      s.country_region AS shipper_country_region,
      o.order_date_key,
      od.day_name_of_week AS order_day_name_of_week,
      od.day_of_month AS order_day_of_month,
      od.weekday_weekend AS order_weekday_weekend,
      od.month_name AS order_month_name,
      od.calendar_quarter AS order_quarter,
      od.calendar_year AS order_year,
      o.paid_date_key,
      pd.day_name_of_week AS paid_day_name_of_week,
      pd.day_of_month AS paid_day_of_month,
      pd.weekday_weekend AS paid_weekday_weekend,
      pd.month_name AS paid_month_name,
      pd.calendar_quarter AS paid_calendar_quarter,
      pd.calendar_year AS paid_calendar_year,
      o.shipped_date_key,
      sd.day_name_of_week AS shipped_day_name_of_week,
      sd.day_of_month AS shipped_day_of_month,
      sd.weekday_weekend AS shipped_weekday_weekend,
      sd.month_name AS shipped_month_name,
      sd.calendar_quarter AS shipped_calendar_quarter,
      sd.calendar_year AS shipped_calendar_year,
      o.quantity,
      o.unit_price,
      o.discount,
      o.shipping_fee,
      o.taxes,
      o.tax_rate,
      o.payment_type,
      o.order_status,
      o.order_details_status
  FROM orders_silver_tempview AS o
  INNER JOIN northwind_dlh.dim_employee AS e
  ON e.employee_key = o.employee_key
  INNER JOIN northwind_dlh.dim_customer AS c
  ON c.customer_key = o.customer_key
  INNER JOIN northwind_dlh.dim_product AS p
  ON p.product_key = o.product_key
  INNER JOIN northwind_dlh.dim_shipper AS s
  ON s.shipper_key = o.shipper_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS od
  ON od.date_key = o.order_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS pd
  ON pd.date_key = o.paid_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS sd
  ON sd.date_key = o.shipped_date_key
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

Out[69]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fc6e4ada6a0>

In [0]:
%sql
SELECT * FROM fact_orders_silver

fact_order_key,order_key,employee_key,employee_last_name,employee_first_name,employee_job_title,employee_company,customer_key,customer_last_name,customer_first_name,product_key,product_code,product_name,product_standard_cost,product_list_price,product_category,shipper_key,shipper_company,shipper_state_province,shipper_country_region,order_date_key,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year,paid_date_key,paid_day_name_of_week,paid_day_of_month,paid_weekday_weekend,paid_month_name,paid_calendar_quarter,paid_calendar_year,shipped_date_key,shipped_day_name_of_week,shipped_day_of_month,shipped_weekday_weekend,shipped_month_name,shipped_calendar_quarter,shipped_calendar_year,quantity,unit_price,discount,shipping_fee,taxes,tax_rate,payment_type,order_status,order_details_status
20,45,1,Freehafer,Nancy,Sales Representative,Northwind Traders,28,Raghav,Amritansh,40,NWTCM-40,Northwind Traders Crab Meat,13.8,18.4,Canned Meat,3,Shipping Company C,TN,USA,20060407,Friday,7,Weekday,April,2,2006,20060407.0,Friday,7.0,Weekday,April,2.0,2006.0,20060407.0,Friday,7.0,Weekday,April,2.0,2006.0,50,18.4,0,40,0,0,Credit Card,Closed,Invoiced
21,46,7,Zare,Robert,Sales Representative,Northwind Traders,9,Mortensen,Sven,57,NWTP-57,Northwind Traders Ravioli,14.625,19.5,Pasta,1,Shipping Company A,TN,USA,20060405,Wednesday,5,Weekday,April,2,2006,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,100,19.5,0,100,0,0,Check,Closed,Invoiced
22,46,7,Zare,Robert,Sales Representative,Northwind Traders,9,Mortensen,Sven,72,NWTD-72,Northwind Traders Mozzarella,26.1,34.8,Dairy products,1,Shipping Company A,TN,USA,20060405,Wednesday,5,Weekday,April,2,2006,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,50,34.8,0,100,0,0,Check,Closed,Invoiced
23,47,6,Neipper,Michael,Sales Representative,Northwind Traders,6,Pérez-Olaeta,Francisco,34,NWTB-34,Northwind Traders Beer,10.5,14.0,Beverages,2,Shipping Company B,TN,USA,20060408,Saturday,8,Weekend,April,2,2006,20060408.0,Saturday,8.0,Weekend,April,2.0,2006.0,20060408.0,Saturday,8.0,Weekend,April,2.0,2006.0,300,14.0,0,300,0,0,Credit Card,Closed,Invoiced
24,48,4,Sergienko,Mariya,Sales Representative,Northwind Traders,8,Andersen,Elizabeth,8,NWTS-8,Northwind Traders Curry Sauce,30.0,40.0,Sauces,2,Shipping Company B,TN,USA,20060405,Wednesday,5,Weekday,April,2,2006,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,25,40.0,0,50,0,0,Check,Closed,Invoiced
25,48,4,Sergienko,Mariya,Sales Representative,Northwind Traders,8,Andersen,Elizabeth,19,NWTBGM-19,Northwind Traders Chocolate Biscuits Mix,6.9,9.2,Baked Goods & Mixes,2,Shipping Company B,TN,USA,20060405,Wednesday,5,Weekday,April,2,2006,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,25,9.2,0,50,0,0,Check,Closed,Invoiced
26,50,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,25,Rodman,John,21,NWTBGM-21,Northwind Traders Scones,7.5,10.0,Baked Goods & Mixes,1,Shipping Company A,TN,USA,20060405,Wednesday,5,Weekday,April,2,2006,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,20,10.0,0,5,0,0,Cash,Closed,Invoiced
27,51,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,26,Liu,Run,5,NWTO-5,Northwind Traders Olive Oil,16.0125,21.35,Oil,3,Shipping Company C,TN,USA,20060405,Wednesday,5,Weekday,April,2,2006,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,25,21.35,0,60,0,0,Credit Card,Closed,Invoiced
28,51,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,26,Liu,Run,41,NWTSO-41,Northwind Traders Clam Chowder,7.2375,9.65,Soups,3,Shipping Company C,TN,USA,20060405,Wednesday,5,Weekday,April,2,2006,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,30,9.65,0,60,0,0,Credit Card,Closed,Invoiced
29,51,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,26,Liu,Run,40,NWTCM-40,Northwind Traders Crab Meat,13.8,18.4,Canned Meat,3,Shipping Company C,TN,USA,20060405,Wednesday,5,Weekday,April,2,2006,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,20060405.0,Wednesday,5.0,Weekday,April,2.0,2006.0,30,18.4,0,60,0,0,Credit Card,Closed,Invoiced


In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.fact_orders_silver

col_name,data_type,comment
fact_order_key,bigint,
order_key,bigint,
employee_key,bigint,
employee_last_name,string,
employee_first_name,string,
employee_job_title,string,
employee_company,string,
customer_key,bigint,
customer_last_name,string,
customer_first_name,string,


##### 6.3. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT customer_key AS CustomerID
  , customer_last_name AS LastName
  , customer_first_name AS FirstName
  , order_month_name AS OrderMonth
  , COUNT(product_key) AS ProductCount
FROM northwind_dlh.fact_orders_silver
GROUP BY CustomerID, LastName, FirstName, OrderMonth
ORDER BY ProductCount DESC

CustomerID,LastName,FirstName,OrderMonth,ProductCount
26,Liu,Run,April,3
8,Andersen,Elizabeth,April,3
6,Pérez-Olaeta,Francisco,June,3
9,Mortensen,Sven,April,2
6,Pérez-Olaeta,Francisco,April,2
11,Krschne,Peter,March,2
4,Lee,Christina,April,2
3,Axen,Thomas,April,2
10,Wacker,Roland,May,2
29,Lee,Soo Jung,April,1


In [0]:
%sql
SELECT pc.CustomerID
  , os.customer_last_name AS CustomerName
  , os.product_key AS ProductNumber
  , pc.ProductCount
FROM northwind_dlh.fact_orders_silver AS os
INNER JOIN (
  SELECT customer_key AS CustomerID
  , COUNT(product_key) AS ProductCount
  FROM northwind_dlh.fact_orders_silver
  GROUP BY customer_key
) AS pc
ON pc.CustomerID = os.customer_key
ORDER BY ProductCount DESC

CustomerID,CustomerName,ProductNumber,ProductCount
6,Pérez-Olaeta,34,5
6,Pérez-Olaeta,48,5
6,Pérez-Olaeta,7,5
6,Pérez-Olaeta,51,5
6,Pérez-Olaeta,48,5
8,Andersen,72,4
8,Andersen,8,4
8,Andersen,19,4
26,Liu,5,4
26,Liu,41,4


#### 7.0. Use AutoLoader to Process Streaming (Hot Path) Purchase Orders Fact Data 
##### 7.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
# Use spark.readStream and the AutoLoader to read in the JSON files in the "purchase_orders_stream_dir"
# directory and then create a TempView named "purchase_orders_raw_tempview".
# Be sure to set the "cloudFiles.schemaLocation" Option using the "purchase_orders_output_bronze" directory

(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_purchase_order_key BIGINT")
 .option("cloudFiles.schemaHints", "purchase_order_key BIGINT")
 .option("cloudFiles.schemaHints", "supplier_key BIGINT")
 .option("cloudFiles.schemaHints", "product_key BIGINT") 
 .option("cloudFiles.schemaHints", "inventory_key BIGINT")
 .option("cloudFiles.schemaHints", "purchase_order_status STRING")
 .option("cloudFiles.schemaHints", "po_detail_posted_to_inventory BIGINT")
 .option("cloudFiles.schemaHints", "po_detail_unit_cost DECIMAL")
 .option("cloudFiles.schemaHints", "po_detail_quantity BIGINT") 
 .option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 .option("cloudFiles.schemaHints", "taxes DECIMAL")
 .option("cloudFiles.schemaHints", "payment_date DECIMAL")
 .option("cloudFiles.schemaHints", "payment_amount DECIMAL")
 .option("cloudFiles.schemaHints", "approved_by BIGINT")
 .option("cloudFiles.schemaHints", "submitted_by BIGINT")
 .option("cloudFiles.schemaHints", "submitted_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "creation_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "po_detail_date_received_key DECIMAL")
 .option("cloudFiles.schemaHints", "approved_date_key DECIMAL")
 .option("cloudFiles.schemaLocation", purchase_orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(purchase_orders_stream_dir)
 .createOrReplaceTempView("purchase_orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW purchase_orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM purchase_orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM purchase_orders_bronze_tempview

approved_by,approved_date_key,created_by,creation_date_key,fact_purchase_order_key,inventory_key,payment_amount,payment_date,po_detail_date_received_key,po_detail_posted_to_inventory,po_detail_quantity,po_detail_unit_cost,product_key,purchase_order_key,purchase_order_status,shipping_fee,submitted_by,submitted_date_key,supplier_key,taxes,_rescued_data,receipt_time,source_file
2.0,20060122.0,9.0,20060122.0,37,74.0,0,,20060122.0,1,100,10.0,48,100,Approved,0,9,20060114.0,2,0,,2023-04-27T17:29:00.692+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060122.0,2.0,20060122.0,38,72.0,0,,20060122.0,1,200,2.0,81,101,Approved,0,2,20060114.0,1,0,,2023-04-27T17:29:00.692+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,1.0,20060324.0,39,,0,,,0,300,34.0,43,102,Approved,0,1,20060324.0,1,0,,2023-04-27T17:29:00.692+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,1.0,20060324.0,40,111.0,0,,20060417.0,1,10,7.0,19,103,Approved,0,1,20060324.0,2,0,,2023-04-27T17:29:00.692+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,1.0,20060324.0,41,115.0,0,,20060406.0,1,50,7.0,41,104,Approved,0,1,20060324.0,2,0,,2023-04-27T17:29:00.692+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,7.0,20060324.0,42,100.0,0,,20060405.0,1,100,15.0,57,105,Approved,0,7,20060324.0,5,0,,2023-04-27T17:29:00.692+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,7.0,20060324.0,43,113.0,0,,20060405.0,1,50,26.0,72,106,Approved,0,7,20060324.0,6,0,,2023-04-27T17:29:00.692+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,6.0,20060324.0,44,107.0,0,,20060405.0,1,300,10.0,34,107,Approved,0,6,20060324.0,1,0,,2023-04-27T17:29:00.692+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,4.0,20060324.0,45,105.0,0,,20060405.0,1,25,30.0,8,108,Approved,0,4,20060324.0,2,0,,2023-04-27T17:29:00.692+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,4.0,20060324.0,46,109.0,0,,20060405.0,1,25,7.0,19,109,Approved,0,4,20060324.0,2,0,,2023-04-27T17:29:00.692+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json


In [0]:
(spark.table("purchase_orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{purchase_orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_purchase_orders_bronze"))

Out[78]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fc6e4b07400>

##### 7.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_purchase_orders_bronze")
  .createOrReplaceTempView("purchase_orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM purchase_orders_silver_tempview

approved_by,approved_date_key,created_by,creation_date_key,fact_purchase_order_key,inventory_key,payment_amount,payment_date,po_detail_date_received_key,po_detail_posted_to_inventory,po_detail_quantity,po_detail_unit_cost,product_key,purchase_order_key,purchase_order_status,shipping_fee,submitted_by,submitted_date_key,supplier_key,taxes,_rescued_data,receipt_time,source_file
2.0,20060122.0,9.0,20060122.0,37,74.0,0,,20060122.0,1,100,10.0,48,100,Approved,0,9,20060114.0,2,0,,2023-04-27T17:29:40.272+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060122.0,2.0,20060122.0,38,72.0,0,,20060122.0,1,200,2.0,81,101,Approved,0,2,20060114.0,1,0,,2023-04-27T17:29:40.272+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,1.0,20060324.0,39,,0,,,0,300,34.0,43,102,Approved,0,1,20060324.0,1,0,,2023-04-27T17:29:40.272+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,1.0,20060324.0,40,111.0,0,,20060417.0,1,10,7.0,19,103,Approved,0,1,20060324.0,2,0,,2023-04-27T17:29:40.272+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,1.0,20060324.0,41,115.0,0,,20060406.0,1,50,7.0,41,104,Approved,0,1,20060324.0,2,0,,2023-04-27T17:29:40.272+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,7.0,20060324.0,42,100.0,0,,20060405.0,1,100,15.0,57,105,Approved,0,7,20060324.0,5,0,,2023-04-27T17:29:40.272+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,7.0,20060324.0,43,113.0,0,,20060405.0,1,50,26.0,72,106,Approved,0,7,20060324.0,6,0,,2023-04-27T17:29:40.272+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,6.0,20060324.0,44,107.0,0,,20060405.0,1,300,10.0,34,107,Approved,0,6,20060324.0,1,0,,2023-04-27T17:29:40.272+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,4.0,20060324.0,45,105.0,0,,20060405.0,1,25,30.0,8,108,Approved,0,4,20060324.0,2,0,,2023-04-27T17:29:40.272+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json
2.0,20060404.0,4.0,20060324.0,46,109.0,0,,20060405.0,1,25,7.0,19,109,Approved,0,4,20060324.0,2,0,,2023-04-27T17:29:40.272+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/purchase_orders/Northwind_Fact_PurchaseOrders03.json


In [0]:
%sql
DESCRIBE EXTENDED purchase_orders_silver_tempview

col_name,data_type,comment
approved_by,bigint,
approved_date_key,"decimal(10,0)",
created_by,bigint,
creation_date_key,bigint,
fact_purchase_order_key,bigint,
inventory_key,bigint,
payment_amount,bigint,
payment_date,string,
po_detail_date_received_key,bigint,
po_detail_posted_to_inventory,bigint,


In [0]:
%sql
-- Create a new Temporary View named "purchase_orders_silver_tempview" by selecting data from
-- "purchase_orders_silver_tempview" and joining it to the Supplier, Product, Employee and Data dimension tables.
-- Remember that the Date dimension can serve as a "Role Playing" dimension by being Joined upon multiple times.

CREATE OR REPLACE TEMPORARY VIEW fact_purchase_orders_silver_tempview AS (
  SELECT po.fact_purchase_order_key,
      po.purchase_order_key,
      po.supplier_key,
      po.inventory_key,
      po.submitted_date_key,
      po.creation_date_key,
      po.po_detail_date_received_key,
      po.approved_date_key,
      e.employee_key,
      e.last_name AS employee_last_name,
      e.first_name AS employee_first_name,
      e.job_title AS employee_job_title,
      e.company AS employee_company,
      po.product_key,
      p.product_code,
      p.product_name,
      p.standard_cost AS product_standard_cost,
      p.list_price AS product_list_price,
      p.category AS product_category,
      od.day_name_of_week AS submitted_date_key_day_name_of_week,
      od.day_of_month AS submitted_date_key_day_of_month,
      od.weekday_weekend AS submitted_date_key_weekday_weekend,
      od.month_name AS submitted_date_key_month_name,
      od.calendar_quarter AS submitted_date_key_quarter,
      od.calendar_year AS submitted_date_key_year,
      pd.day_name_of_week AS creation_date_day_name_of_week,
      pd.day_of_month AS creation_date_day_of_month,
      pd.weekday_weekend AS creation_date_weekday_weekend,
      pd.month_name AS creation_date_month_name,
      pd.calendar_quarter AS creation_date_calendar_quarter,
      pd.calendar_year AS creation_date_calendar_year,
      sd.day_name_of_week AS date_received_day_name_of_week,
      sd.day_of_month AS date_received_day_of_month,
      sd.weekday_weekend AS date_received_weekday_weekend,
      sd.month_name AS date_received_month_name,
      sd.calendar_quarter AS date_received_calendar_quarter,
      sd.calendar_year AS date_received_calendar_year,
      dd.day_name_of_week AS approved_date_day_name_of_week,
      dd.day_of_month AS approved_date_day_of_month,
      dd.weekday_weekend AS approved_date_weekday_weekend,
      dd.month_name AS approved_date_month_name,
      dd.calendar_quarter AS approved_date_calendar_quarter,
      dd.calendar_year AS approved_date_calendar_year,
      po.po_detail_quantity,
      po.po_detail_unit_cost,
      po.payment_amount,
      po.shipping_fee,
      po.taxes,
      po.purchase_order_status
      
  FROM purchase_orders_silver_tempview AS po 
  INNER JOIN northwind_dlh.dim_employee AS e
  ON e.employee_key = po.created_by
  INNER JOIN northwind_dlh.dim_product AS p
  ON p.product_key = po.product_key
  INNER JOIN northwind_dlh.dim_supplier AS s
  ON s.supplier_key = po.supplier_key

  LEFT OUTER JOIN northwind_dlh.dim_date AS od
  ON od.date_key = po.submitted_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS pd
  ON pd.date_key = po.creation_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS sd
  ON sd.date_key = po.po_detail_date_received_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS dd
  ON sd.date_key = po.approved_date_key
)
    


In [0]:
(spark.table("fact_purchase_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{purchase_orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_purchase_orders_silver"))

Out[83]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fc6e5c36dc0>

In [0]:
%sql
SELECT * FROM fact_purchase_orders_silver

fact_purchase_order_key,purchase_order_key,supplier_key,inventory_key,submitted_date_key,creation_date_key,po_detail_date_received_key,approved_date_key,employee_key,employee_last_name,employee_first_name,employee_job_title,employee_company,product_key,product_code,product_name,product_standard_cost,product_list_price,product_category,submitted_date_key_day_name_of_week,submitted_date_key_day_of_month,submitted_date_key_weekday_weekend,submitted_date_key_month_name,submitted_date_key_quarter,submitted_date_key_year,creation_date_day_name_of_week,creation_date_day_of_month,creation_date_weekday_weekend,creation_date_month_name,creation_date_calendar_quarter,creation_date_calendar_year,date_received_day_name_of_week,date_received_day_of_month,date_received_weekday_weekend,date_received_month_name,date_received_calendar_quarter,date_received_calendar_year,approved_date_day_name_of_week,approved_date_day_of_month,approved_date_weekday_weekend,approved_date_month_name,approved_date_calendar_quarter,approved_date_calendar_year,po_detail_quantity,po_detail_unit_cost,payment_amount,shipping_fee,taxes,purchase_order_status
37,100,2,74,20060114,20060122,20060122,20060122,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,48,NWTCA-48,Northwind Traders Chocolate,9.5625,12.75,Candy,Saturday,14,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Saturday,1,Weekend,January,1,2000,100,10.0,0,0,0,Approved
37,100,2,74,20060114,20060122,20060122,20060122,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,48,NWTCA-48,Northwind Traders Chocolate,9.5625,12.75,Candy,Saturday,14,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Sunday,2,Weekend,January,1,2000,100,10.0,0,0,0,Approved
37,100,2,74,20060114,20060122,20060122,20060122,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,48,NWTCA-48,Northwind Traders Chocolate,9.5625,12.75,Candy,Saturday,14,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Monday,3,Weekday,January,1,2000,100,10.0,0,0,0,Approved
37,100,2,74,20060114,20060122,20060122,20060122,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,48,NWTCA-48,Northwind Traders Chocolate,9.5625,12.75,Candy,Saturday,14,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Tuesday,4,Weekday,January,1,2000,100,10.0,0,0,0,Approved
37,100,2,74,20060114,20060122,20060122,20060122,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,48,NWTCA-48,Northwind Traders Chocolate,9.5625,12.75,Candy,Saturday,14,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Wednesday,5,Weekday,January,1,2000,100,10.0,0,0,0,Approved
37,100,2,74,20060114,20060122,20060122,20060122,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,48,NWTCA-48,Northwind Traders Chocolate,9.5625,12.75,Candy,Saturday,14,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Thursday,6,Weekday,January,1,2000,100,10.0,0,0,0,Approved
37,100,2,74,20060114,20060122,20060122,20060122,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,48,NWTCA-48,Northwind Traders Chocolate,9.5625,12.75,Candy,Saturday,14,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Friday,7,Weekday,January,1,2000,100,10.0,0,0,0,Approved
37,100,2,74,20060114,20060122,20060122,20060122,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,48,NWTCA-48,Northwind Traders Chocolate,9.5625,12.75,Candy,Saturday,14,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Saturday,8,Weekend,January,1,2000,100,10.0,0,0,0,Approved
37,100,2,74,20060114,20060122,20060122,20060122,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,48,NWTCA-48,Northwind Traders Chocolate,9.5625,12.75,Candy,Saturday,14,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Sunday,9,Weekend,January,1,2000,100,10.0,0,0,0,Approved
37,100,2,74,20060114,20060122,20060122,20060122,9,Hellung-Larsen,Anne,Sales Representative,Northwind Traders,48,NWTCA-48,Northwind Traders Chocolate,9.5625,12.75,Candy,Saturday,14,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Sunday,22,Weekend,January,1,2006,Monday,10,Weekday,January,1,2000,100,10.0,0,0,0,Approved


In [0]:
%sql
DESCRIBE EXTENDED fact_purchase_orders_silver

col_name,data_type,comment
fact_purchase_order_key,bigint,
purchase_order_key,bigint,
supplier_key,bigint,
inventory_key,bigint,
submitted_date_key,bigint,
creation_date_key,bigint,
po_detail_date_received_key,bigint,
approved_date_key,"decimal(10,0)",
employee_key,int,
employee_last_name,string,


##### 7.3. Gold Table: Perform Aggregations

In [0]:
%sql
-- Author a query that returns the Total List Price grouped by Supplier and Product and sorted by Total List Price descending.
SELECT SUM(product_list_price) AS total_list_price
FROM northwind_dlh.fact_purchase_orders_silver
GROUP BY supplier_key, product_key
ORDER BY total_list_price DESC;

total_list_price
780620.0
687285.0
449705.0
356342.0
339400.0
330876.0
322392.0
295277.9999999528
254520.0
216342.0


#### 8.0. Use AutoLoader to Process Streaming (Hot Path) Inventory Transactions Fact Data 
##### 8.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
# Use spark.readStream and the AutoLoader to read in the JSON files in the "inventory_trans_stream_dir"
# directory and then create a TempView named "inventory_transactions_raw_tempview".
# Be sure to set the "cloudFiles.schemaLocation" Option using the "inventory_trans_output_bronze" directory
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 .option("cloudFiles.schemaHints", "order_key BIGINT")
 .option("cloudFiles.schemaHints", "employee_key BIGINT")
 .option("cloudFiles.schemaHints", "customer_key BIGINT") 
 .option("cloudFiles.schemaHints", "product_key BIGINT")
 .option("cloudFiles.schemaHints", "shipper_key DECIMAL")
 .option("cloudFiles.schemaHints", "order_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "paid_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 .option("cloudFiles.schemaHints", "quantity DECIMAL")
 .option("cloudFiles.schemaHints", "unit_price DECIMAL")
 .option("cloudFiles.schemaHints", "discount DECIMAL")
 .option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 .option("cloudFiles.schemaHints", "taxes DECIMAL")
 .option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 .option("cloudFiles.schemaHints", "payment_type STRING")
 .option("cloudFiles.schemaHints", "order_status STRING")
 .option("cloudFiles.schemaHints", "order_details_status STRING")
 .option("cloudFiles.schemaLocation", inventory_trans_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(inventory_trans_stream_dir)
 .createOrReplaceTempView("inventory_transactions_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW inventory_transactions_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM inventory_transactions_raw_tempview)

In [0]:
%sql
SELECT * FROM inventory_transactions_raw_tempview

fact_inventory_transaction_key,inventory_transaction_key,inventory_transaction_type,order_key,product_key,purchase_order_key,quantity,transaction_created_date_key,transaction_modified_date_key,order_details_status,_rescued_data
93,86,On Hold,,80,,20,20060324,20060324,,
94,87,On Hold,,81,,50,20060324,20060324,,
95,88,On Hold,,1,,25,20060324,20060324,,
96,89,On Hold,,43,,25,20060324,20060324,,
97,90,On Hold,,81,,25,20060324,20060324,,
98,96,On Hold,,34,,12,20060330,20060330,,
99,97,On Hold,,34,,10,20060330,20060330,,
100,98,On Hold,,34,,1,20060330,20060330,,
101,104,On Hold,,43,,300,20060404,20060404,,
68,106,Sold,,8,,25,20060404,20060404,,


In [0]:
(spark.table("inventory_transactions_bronze_tempview")
     .writeStream
     .format("delta")
     .option("checkpointLocation", f"{inventory_trans_output_bronze}/_checkpoint")
     .outputMode("append")
     .table("fact_inventory_transactions_bronze"))

Out[90]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fc6e5c3b6d0>

##### 8.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_inventory_transactions_bronze")
  .createOrReplaceTempView("inventory_transactions_silver_tempview"))

In [0]:
%sql
SELECT * FROM inventory_transactions_silver_tempview

fact_inventory_transaction_key,inventory_transaction_key,inventory_transaction_type,order_key,product_key,purchase_order_key,quantity,transaction_created_date_key,transaction_modified_date_key,order_details_status,_rescued_data,receipt_time,source_file
1,35,Purchased,,80,,75,20060322,20060322,,,2023-04-27T17:38:51.156+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/inventory_transactions/Northwind_Fact_InventoryTransactions01.json
2,36,Purchased,,72,,40,20060322,20060322,,,2023-04-27T17:38:51.156+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/inventory_transactions/Northwind_Fact_InventoryTransactions01.json
3,37,Purchased,,52,,100,20060322,20060322,,,2023-04-27T17:38:51.156+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/inventory_transactions/Northwind_Fact_InventoryTransactions01.json
4,38,Purchased,,56,,120,20060322,20060322,,,2023-04-27T17:38:51.156+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/inventory_transactions/Northwind_Fact_InventoryTransactions01.json
5,39,Purchased,,57,,80,20060322,20060322,,,2023-04-27T17:38:51.156+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/inventory_transactions/Northwind_Fact_InventoryTransactions01.json
6,40,Purchased,,6,,100,20060322,20060322,,,2023-04-27T17:38:51.156+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/inventory_transactions/Northwind_Fact_InventoryTransactions01.json
7,41,Purchased,,7,,40,20060322,20060322,,,2023-04-27T17:38:51.156+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/inventory_transactions/Northwind_Fact_InventoryTransactions01.json
8,42,Purchased,,8,,40,20060322,20060322,,,2023-04-27T17:38:51.156+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/inventory_transactions/Northwind_Fact_InventoryTransactions01.json
9,43,Purchased,,14,,40,20060322,20060322,,,2023-04-27T17:38:51.156+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/inventory_transactions/Northwind_Fact_InventoryTransactions01.json
10,44,Purchased,,17,,40,20060322,20060322,,,2023-04-27T17:38:51.156+0000,dbfs:/FileStore/ds2002-lab06/source_data/stream/inventory_transactions/Northwind_Fact_InventoryTransactions01.json


In [0]:
%sql
DESCRIBE EXTENDED inventory_transactions_silver_tempview

col_name,data_type,comment
fact_inventory_transaction_key,bigint,
inventory_transaction_key,bigint,
inventory_transaction_type,string,
order_key,string,
product_key,bigint,
purchase_order_key,string,
quantity,bigint,
transaction_created_date_key,bigint,
transaction_modified_date_key,bigint,
order_details_status,string,


In [0]:
%sql
-- Create a new Temporary View named "fact_inventory_transactions_silver_tempview" by selecting data from
-- "inventory_transactions_silver_tempview" and joining it to the Product and Data dimension tables.
-- Remember that the Date dimension can serve as a "Role Playing" dimension by being Joined upon multiple times.
CREATE OR REPLACE TEMPORARY VIEW fact_inventory_transactions_silver_tempview AS (
  SELECT it.fact_inventory_transaction_key,
      it.inventory_transaction_key,
		  it.product_key,
		  it.transaction_created_date_key,
		  it.transaction_modified_date_key,
		  it.inventory_transaction_type,
		  it.quantity,
		  it.purchase_order_key,
		  it.order_key,
      p.product_code,
      p.product_name,
      p.standard_cost AS product_standard_cost,
      p.list_price AS product_list_price,
      p.category AS product_category,
      od.day_name_of_week AS transaction_created_day_name_of_week,
      od.day_of_month AS transaction_created_day_of_month,
      od.weekday_weekend AS transaction_created_weekday_weekend,
      od.month_name AS transaction_created_month_name,
      od.calendar_quarter AS transaction_created_quarter,
      od.calendar_year AS transaction_created_year,
      pd.day_name_of_week AS transaction_modified_day_name_of_week,
      pd.day_of_month AS transaction_modified_day_of_month,
      pd.weekday_weekend AS transaction_modified_weekday_weekend,
      pd.month_name AS transaction_modified_month_name,
      pd.calendar_quarter AS transaction_modified_calendar_quarter,
      pd.calendar_year AS transaction_modified_calendar_year

  FROM inventory_transactions_silver_tempview AS it
  INNER JOIN northwind_dlh.dim_product AS p
  ON p.product_key = it.product_key
  
  LEFT OUTER JOIN northwind_dlh.dim_date AS od
  ON od.date_key = it.transaction_created_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS pd
  ON pd.date_key = it.transaction_modified_date_key
)

In [0]:
(spark.table("fact_inventory_transactions_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{inventory_trans_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_inventory_transactions_silver"))

Out[95]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fc6e5be78e0>

In [0]:
%sql
SELECT * FROM fact_inventory_transactions_silver

fact_inventory_transaction_key,inventory_transaction_key,product_key,transaction_created_date_key,transaction_modified_date_key,inventory_transaction_type,quantity,purchase_order_key,order_key,product_code,product_name,product_standard_cost,product_list_price,product_category,transaction_created_day_name_of_week,transaction_created_day_of_month,transaction_created_weekday_weekend,transaction_created_month_name,transaction_created_quarter,transaction_created_year,transaction_modified_day_name_of_week,transaction_modified_day_of_month,transaction_modified_weekday_weekend,transaction_modified_month_name,transaction_modified_calendar_quarter,transaction_modified_calendar_year
1,35,80,20060322,20060322,Purchased,75,,,NWTDFN-80,Northwind Traders Dried Plums,3.0,3.5,Dried Fruit & Nuts,Wednesday,22,Weekday,March,1,2006,Wednesday,22,Weekday,March,1,2006
2,36,72,20060322,20060322,Purchased,40,,,NWTD-72,Northwind Traders Mozzarella,26.1,34.8,Dairy products,Wednesday,22,Weekday,March,1,2006,Wednesday,22,Weekday,March,1,2006
3,37,52,20060322,20060322,Purchased,100,,,NWTG-52,Northwind Traders Long Grain Rice,5.25,7.0,Grains,Wednesday,22,Weekday,March,1,2006,Wednesday,22,Weekday,March,1,2006
4,38,56,20060322,20060322,Purchased,120,,,NWTP-56,Northwind Traders Gnocchi,28.5,38.0,Pasta,Wednesday,22,Weekday,March,1,2006,Wednesday,22,Weekday,March,1,2006
5,39,57,20060322,20060322,Purchased,80,,,NWTP-57,Northwind Traders Ravioli,14.625,19.5,Pasta,Wednesday,22,Weekday,March,1,2006,Wednesday,22,Weekday,March,1,2006
6,40,6,20060322,20060322,Purchased,100,,,NWTJP-6,Northwind Traders Boysenberry Spread,18.75,25.0,"Jams, Preserves",Wednesday,22,Weekday,March,1,2006,Wednesday,22,Weekday,March,1,2006
7,41,7,20060322,20060322,Purchased,40,,,NWTDFN-7,Northwind Traders Dried Pears,22.5,30.0,Dried Fruit & Nuts,Wednesday,22,Weekday,March,1,2006,Wednesday,22,Weekday,March,1,2006
8,42,8,20060322,20060322,Purchased,40,,,NWTS-8,Northwind Traders Curry Sauce,30.0,40.0,Sauces,Wednesday,22,Weekday,March,1,2006,Wednesday,22,Weekday,March,1,2006
9,43,14,20060322,20060322,Purchased,40,,,NWTDFN-14,Northwind Traders Walnuts,17.4375,23.25,Dried Fruit & Nuts,Wednesday,22,Weekday,March,1,2006,Wednesday,22,Weekday,March,1,2006
10,44,17,20060322,20060322,Purchased,40,,,NWTCFV-17,Northwind Traders Fruit Cocktail,29.25,39.0,Canned Fruit & Vegetables,Wednesday,22,Weekday,March,1,2006,Wednesday,22,Weekday,March,1,2006


In [0]:
%sql
DESCRIBE EXTENDED fact_inventory_transactions_silver

col_name,data_type,comment
fact_inventory_transaction_key,bigint,
inventory_transaction_key,bigint,
product_key,bigint,
transaction_created_date_key,bigint,
transaction_modified_date_key,bigint,
inventory_transaction_type,string,
quantity,bigint,
purchase_order_key,string,
order_key,string,
product_code,string,


##### 8.3. Gold Table: Perform Aggregations

In [0]:
%sql
-- Author a query that returns the Total Quantity grouped by the Quarter Created, Inventory Transaction Type, and Product
-- Sort by the Total Quantity Descending
SELECT SUM(quantity) AS total_quantity
FROM northwind_dlh.fact_inventory_transactions_silver
GROUP BY transaction_created_quarter,inventory_transaction_type, product_name
ORDER BY total_quantity DESC

total_quantity
400
387
350
325
320
300
250
240
230
200


#### 9.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/ds2002-lab06/