## Lab 06: Data Lakehouse with Structured Streaming
This lab will help you learn to use many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-session capstone project for course **DS-2002: Data Systems**. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "wna8fw-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "northwind_dw2"

connection_properties = {
  "user" : "jtupitza",
  "password" : "Passw0rd123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "sandbox.zibbf"
atlas_database_name = "northwind_dw2"
atlas_user_name = "m001-student"
atlas_password = "Passw0rd1234"

mongo_uri = f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net/{atlas_database_name}"
print(mongo_uri)

# Data Files (JSON) Information ###############################
dst_database = "northwind_dlh"

base_dir = "dbfs:/FileStore/ds2002-lab06"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"
purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS northwind_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS northwind_dlh
COMMENT "DS-2002 Lab 06 Database"
LOCATION "dbfs:/FileStore/ds2002-lab06/northwind_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Lab 6.0");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wna8fw-mysql.mysql.database.azure.com:3306/northwind_dw2",
  dbtable "dim_date",
  user "jtupitza",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE northwind_dlh;

CREATE OR REPLACE TABLE northwind_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/northwind_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM northwind_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wna8fw-mysql.mysql.database.azure.com:3306/northwind_dw2",
  dbtable "dim_products",
  user "jtupitza",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE northwind_dlh;

CREATE OR REPLACE TABLE northwind_dlh.dim_product
COMMENT "Products Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/northwind_dlh/dim_product"
AS SELECT * FROM view_product

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_product;

col_name,data_type,comment
product_key,bigint,
product_code,string,
product_name,string,
standard_cost,double,
list_price,double,
reorder_level,bigint,
target_level,bigint,
quantity_per_unit,string,
discontinued,bigint,
minimum_reorder_quantity,double,


In [0]:
%sql
SELECT * FROM northwind_dlh.dim_product LIMIT 5

product_key,product_code,product_name,standard_cost,list_price,reorder_level,target_level,quantity_per_unit,discontinued,minimum_reorder_quantity,category
1,NWTB-1,Northwind Traders Chai,13.5,18.0,10,40,10 boxes x 20 bags,0,10.0,Beverages
3,NWTCO-3,Northwind Traders Syrup,7.5,10.0,25,100,12 - 550 ml bottles,0,25.0,Condiments
4,NWTCO-4,Northwind Traders Cajun Seasoning,16.5,22.0,10,40,48 - 6 oz jars,0,10.0,Condiments
5,NWTO-5,Northwind Traders Olive Oil,16.0125,21.35,10,40,36 boxes,0,10.0,Oil
6,NWTJP-6,Northwind Traders Boysenberry Spread,18.75,25.0,25,100,12 - 8 oz jars,0,25.0,"Jams, Preserves"


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-lab06/source_data/batch/Northwind_DimCustomers.json,Northwind_DimCustomers.json,10476,1681227123000
dbfs:/FileStore/ds2002-lab06/source_data/batch/Northwind_DimEmployees.csv,Northwind_DimEmployees.csv,2164,1681227123000
dbfs:/FileStore/ds2002-lab06/source_data/batch/Northwind_DimInvoices.json,Northwind_DimInvoices.json,6263,1681227123000
dbfs:/FileStore/ds2002-lab06/source_data/batch/Northwind_DimShippers.csv,Northwind_DimShippers.csv,262,1681227123000
dbfs:/FileStore/ds2002-lab06/source_data/batch/Northwind_DimSuppliers.json,Northwind_DimSuppliers.json,1480,1681227123000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds2002-lab06/source_data/batch'
json_files = {"customers" : 'Northwind_DimCustomers.json', "suppliers" : 'Northwind_DimSuppliers.json', "invoices" : 'Northwind_DimInvoices.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val atlas_uri = "mongodb+srv://m001-student:Passw0rd1234@sandbox.zibbf.mongodb.net/northwind_dw2"

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "northwind_dw2").option("collection", "customers")
.option("uri", "mongodb+srv://m001-student:Passw0rd1234@sandbox.zibbf.mongodb.net/northwind_dw2").load()
.select("customer_key","company","last_name","first_name","job_title","business_phone","fax_number","address","city","state_province","zip_postal_code","country_region")

display(df_customer)

customer_key,company,last_name,first_name,job_title,business_phone,fax_number,address,city,state_province,zip_postal_code,country_region
1,Company A,Bedecs,Anna,Owner,(123)555-0100,(123)555-0101,123 1st Street,Seattle,WA,99999,USA
2,Company B,Gratacos Solsona,Antonio,Owner,(123)555-0100,(123)555-0101,123 2nd Street,Boston,MA,99999,USA
3,Company C,Axen,Thomas,Purchasing Representative,(123)555-0100,(123)555-0101,123 3rd Street,Los Angelas,CA,99999,USA
4,Company D,Lee,Christina,Purchasing Manager,(123)555-0100,(123)555-0101,123 4th Street,New York,NY,99999,USA
5,Company E,O’Donnell,Martin,Owner,(123)555-0100,(123)555-0101,123 5th Street,Minneapolis,MN,99999,USA
6,Company F,Pérez-Olaeta,Francisco,Purchasing Manager,(123)555-0100,(123)555-0101,123 6th Street,Milwaukee,WI,99999,USA
7,Company G,Xie,Ming-Yang,Owner,(123)555-0100,(123)555-0101,123 7th Street,Boise,ID,99999,USA
8,Company H,Andersen,Elizabeth,Purchasing Representative,(123)555-0100,(123)555-0101,123 8th Street,Portland,OR,99999,USA
9,Company I,Mortensen,Sven,Purchasing Manager,(123)555-0100,(123)555-0101,123 9th Street,Salt Lake City,UT,99999,USA
10,Company J,Wacker,Roland,Purchasing Manager,(123)555-0100,(123)555-0101,123 10th Street,Chicago,IL,99999,USA


In [0]:
%scala
df_customer.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_customer

In [0]:
%sql
SELECT * FROM northwind_dlh.dim_customer LIMIT 5

##### 2.4.1 Fetch Supplier Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_supplier = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "northwind_dw2").option("collection", "suppliers").load()
.select("supplier_key","company","last_name","first_name","job_title")

display(df_supplier)

In [0]:
%scala
df_supplier.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Suppliers Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_supplier.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_supplier")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_supplier

In [0]:
%sql
SELECT * FROM northwind_dlh.dim_supplier LIMIT 5

##### 2.5.1 Fetch Invoice Dimension Data from teh New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_invoice = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "northwind_dw2").option("collection", "invoices").load()
.select("invoice_key","order_key","invoice_date","due_date","tax","shipping","amount_due")

display(df_invoice)

In [0]:
%scala
df_invoice.printSchema()

##### 2.5.2. Use the Spark DataFrame to Create a New Invoices Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_invoice.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_invoice")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_invoice

In [0]:
%sql
SELECT * FROM northwind_dlh.dim_invoice LIMIT 5

#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
employee_csv = f"{batch_dir}/Northwind_DimEmployees.csv"

df_employee = spark.read.format('csv').options(header='true', inferSchema='true').load(employee_csv)
display(df_employee)

In [0]:
df_employee.printSchema()

In [0]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_employee")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_employee;

In [0]:
%sql
SELECT * FROM northwind_dlh.dim_employee LIMIT 5;

##### 3.2 Use PySpark to Read Shipper Dimension Data from CSV File

In [0]:
shipper_csv = f"{batch_dir}/Northwind_DimShippers.csv"

df_shipper = spark.read.format('csv').options(header='true', inferSchema='true').load(shipper_csv)
display(df_shipper)

In [0]:
df_shipper.printSchema()

In [0]:
df_shipper.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_shipper")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_shipper;

In [0]:
%sql
SELECT * FROM northwind_dlh.dim_shipper LIMIT 5;

##### Verify Dimension Tables

In [0]:
%sql
USE northwind_dlh;
SHOW TABLES

### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 .option("cloudFiles.schemaHints", "order_key BIGINT")
 .option("cloudFiles.schemaHints", "employee_key BIGINT")
 .option("cloudFiles.schemaHints", "customer_key BIGINT") 
 .option("cloudFiles.schemaHints", "product_key BIGINT")
 .option("cloudFiles.schemaHints", "shipper_key DECIMAL")
 .option("cloudFiles.schemaHints", "order_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "paid_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 .option("cloudFiles.schemaHints", "quantity DECIMAL")
 .option("cloudFiles.schemaHints", "unit_price DECIMAL")
 .option("cloudFiles.schemaHints", "discount DECIMAL")
 .option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 .option("cloudFiles.schemaHints", "taxes DECIMAL")
 .option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 .option("cloudFiles.schemaHints", "payment_type STRING")
 .option("cloudFiles.schemaHints", "order_status STRING")
 .option("cloudFiles.schemaHints", "order_details_status STRING")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.fact_order_key,
      o.order_key,
      o.employee_key,
      e.last_name AS employee_last_name,
      e.first_name AS employee_first_name,
      e.job_title AS employee_job_title,
      e.company AS employee_company,
      o.customer_key,
      c.last_name AS customer_last_name,
      c.first_name AS customer_first_name,
      o.product_key,
      p.product_code,
      p.product_name,
      p.standard_cost AS product_standard_cost,
      p.list_price AS product_list_price,
      p.category AS product_category,
      o.shipper_key,
      s.company AS shipper_company,
      s.state_province AS shipper_state_province,
      s.country_region AS shipper_country_region,
      o.order_date_key,
      od.day_name_of_week AS order_day_name_of_week,
      od.day_of_month AS order_day_of_month,
      od.weekday_weekend AS order_weekday_weekend,
      od.month_name AS order_month_name,
      od.calendar_quarter AS order_quarter,
      od.calendar_year AS order_year,
      o.paid_date_key,
      pd.day_name_of_week AS paid_day_name_of_week,
      pd.day_of_month AS paid_day_of_month,
      pd.weekday_weekend AS paid_weekday_weekend,
      pd.month_name AS paid_month_name,
      pd.calendar_quarter AS paid_calendar_quarter,
      pd.calendar_year AS paid_calendar_year,
      o.shipped_date_key,
      sd.day_name_of_week AS shipped_day_name_of_week,
      sd.day_of_month AS shipped_day_of_month,
      sd.weekday_weekend AS shipped_weekday_weekend,
      sd.month_name AS shipped_month_name,
      sd.calendar_quarter AS shipped_calendar_quarter,
      sd.calendar_year AS shipped_calendar_year,
      o.quantity,
      o.unit_price,
      o.discount,
      o.shipping_fee,
      o.taxes,
      o.tax_rate,
      o.payment_type,
      o.order_status,
      o.order_details_status
  FROM orders_silver_tempview AS o
  INNER JOIN northwind_dlh.dim_employee AS e
  ON e.employee_key = o.employee_key
  INNER JOIN northwind_dlh.dim_customer AS c
  ON c.customer_key = o.customer_key
  INNER JOIN northwind_dlh.dim_product AS p
  ON p.product_key = o.product_key
  INNER JOIN northwind_dlh.dim_shipper AS s
  ON s.shipper_key = o.shipper_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS od
  ON od.date_key = o.order_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS pd
  ON pd.date_key = o.paid_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS sd
  ON sd.date_key = o.shipped_date_key
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

In [0]:
%sql
SELECT * FROM fact_orders_silver

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.fact_orders_silver

##### 6.3. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT customer_key AS CustomerID
  , customer_last_name AS LastName
  , customer_first_name AS FirstName
  , order_month_name AS OrderMonth
  , COUNT(product_key) AS ProductCount
FROM northwind_dlh.fact_orders_silver
GROUP BY CustomerID, LastName, FirstName, OrderMonth
ORDER BY ProductCount DESC

In [0]:
%sql
SELECT pc.CustomerID
  , os.customer_last_name AS CustomerName
  , os.product_key AS ProductNumber
  , pc.ProductCount
FROM northwind_dlh.fact_orders_silver AS os
INNER JOIN (
  SELECT customer_key AS CustomerID
  , COUNT(product_key) AS ProductCount
  FROM northwind_dlh.fact_orders_silver
  GROUP BY customer_key
) AS pc
ON pc.CustomerID = os.customer_key
ORDER BY ProductCount DESC

#### 7.0. Use AutoLoader to Process Streaming (Hot Path) Purchase Orders Fact Data 
##### 7.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_purchase_order_key BIGINT")
 .option("cloudFiles.schemaHints", "purchase_order_key BIGINT")
 .option("cloudFiles.schemaHints", "supplier_key BIGINT")
 .option("cloudFiles.schemaHints", "created_by BIGINT") 
 .option("cloudFiles.schemaHints", "submitted_date STRING")
 .option("cloudFiles.schemaHints", "creation_date STRING")
 .option("cloudFiles.schemaHints", "status STRING")
 .option("cloudFiles.schemaHints", "expected_date STRING")
 .option("cloudFiles.schemaHints", "shipping_fee DECIMAL") 
 .option("cloudFiles.schemaHints", "taxes DECIMAL")
 .option("cloudFiles.schemaHints", "payment_date STRING")
 .option("cloudFiles.schemaHints", "payment_amount DECIMAL")
 .option("cloudFiles.schemaHints", "payment_method STRING")
 .option("cloudFiles.schemaHints", "approved_by DECIMAL")
 .option("cloudFiles.schemaHints", "approved_date STRING")
 .option("cloudFiles.schemaHints", "submitted_by BIGINT")
 .option("cloudFiles.schemaHints", "purchase_order_detail_key BIGINT")
 .option("cloudFiles.schemaHints", "product_key BIGINT")
 .option("cloudFiles.schemaHints", "quantity DECIMAL")
 .option("cloudFiles.schemaHints", "unit_cost DECIMAL")
 .option("cloudFiles.schemaHints", "date_received STRING")
 .option("cloudFiles.schemaHints", "posted_to_inventory BIGINT")
 .option("cloudFiles.schemaHints", "inventory_key DECIMAL")
 .option("cloudFiles.schemaLocation", purchase_orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(purchase_orders_stream_dir)
 .createOrReplaceTempView("purchase_orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW purchase_orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM purchase_orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM purchase_orders_bronze_tempview

In [0]:
(spark.table("purchase_orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{purchase_orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_purchase_orders_bronze"))

##### 7.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_purchase_orders_bronze")
  .createOrReplaceTempView("purchase_orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM purchase_orders_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED purchase_orders_silver_tempview

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_purchase_orders_silver_tempview AS (
  SELECT po.fact_purchase_order_key,
      po.purchase_order_key,
      --po.supplier_key,
      s.company AS supplier_company,
      --po.product_key,
      p.product_name,
      p.list_price,
      po.inventory_key,
      --po.created_by,
      ce.first_name AS created_by_first_name,
      ce.last_name AS created_by_last_name,
      ce.job_title AS creator_job_title,
      ce.company AS creator_company,
      po.creation_date_key,
      cd.day_name_of_week AS creation_day_name_of_week,
      cd.day_of_month AS creation_day_of_month,
      cd.weekday_weekend AS creation_weekday_weekend,
      cd.month_name AS creation_month_name,
      cd.calendar_quarter AS creation_quarter,
      cd.calendar_year AS creation_year,
      po.purchase_order_status,
      po.po_detail_posted_to_inventory,
      po.po_detail_unit_cost,
      po.po_detail_quantity,
      po.shipping_fee,
      po.taxes,
      po.payment_date,
      po.payment_amount,
      --po.submitted_by,
      se.first_name AS submitted_by_first_name,
      se.last_name AS submitted_by_last_name,
      se.job_title AS submitter_job_title,
      se.company AS submitter_company,
      po.submitted_date_key,
      sd.day_name_of_week AS po_submitted_day_name_of_week,
      sd.day_of_month AS po_submitted_day_of_month,
      sd.weekday_weekend AS po_submitted_weekday_weekend,
      sd.month_name AS po_submitted_month_name,
      sd.calendar_quarter AS po_submitted_quarter,
      sd.calendar_year AS po_submitted_year,
      po.po_detail_date_received_key,
      rd.day_name_of_week AS po_detail_received_day_name_of_week,
      rd.day_of_month AS po_detail_received_day_of_month,
      rd.weekday_weekend AS po_detail_received_weekday_weekend,
      rd.month_name AS po_detail_received_month_name,
      rd.calendar_quarter AS po_detail_received_quarter,
      rd.calendar_year AS po_detail_received_year,     
      --po.approved_by,
      ae.first_name AS approved_by_first_name,
      ae.last_name AS approved_by_last_name,
      ae.job_title AS approver_job_title,
      ae.company AS approver_company,     
      po.approved_date_key,
      ad.day_name_of_week AS approved_day_name_of_week,
      ad.day_of_month AS approved_day_of_month,
      ad.weekday_weekend AS approved_weekday_weekend,
      ad.month_name AS approved_month_name,
      ad.calendar_quarter AS approved_quarter,
      ad.calendar_year AS approved_year
  FROM purchase_orders_silver_tempview AS po
  INNER JOIN northwind_dlh.dim_supplier AS s
  ON s.supplier_key = po.supplier_key
  INNER JOIN northwind_dlh.dim_product AS p
  ON p.product_key = po.product_key
  INNER JOIN northwind_dlh.dim_employee AS ce
  ON ce.employee_key = po.created_by
  INNER JOIN northwind_dlh.dim_employee AS ae
  ON ae.employee_key = po.approved_by
  INNER JOIN northwind_dlh.dim_employee AS se
  ON se.employee_key = po.submitted_by
  LEFT OUTER JOIN northwind_dlh.dim_date AS sd
  ON sd.date_key = po.submitted_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS cd
  ON cd.date_key = po.creation_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS rd
  ON rd.date_key = po.po_detail_date_received_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS ad
  ON ad.date_key = po.approved_date_key
)

In [0]:
(spark.table("fact_purchase_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{purchase_orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_purchase_orders_silver"))

In [0]:
%sql
SELECT * FROM fact_purchase_orders_silver

In [0]:
%sql
DESCRIBE EXTENDED fact_purchase_orders_silver

##### 7.3. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT supplier_company AS Supplier
    , product_name AS Product
    , SUM(list_price) AS TotalListPrice
FROM northwind_dlh.fact_purchase_orders_silver
GROUP BY Supplier, Product
ORDER BY TotalListPrice DESC

#### 8.0. Use AutoLoader to Process Streaming (Hot Path) Inventory Transactions Fact Data 
##### 8.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_inventory_transaction_key BIGINT")
 .option("cloudFiles.schemaHints", "inventory_transaction_key BIGINT")
 .option("cloudFiles.schemaHints", "product_key BIGINT")
 .option("cloudFiles.schemaHints", "transaction_created_date_key BIGINT") 
 .option("cloudFiles.schemaHints", "transaction_modified_date_key BIGINT")
 .option("cloudFiles.schemaHints", "inventory_transaction_type STRING")
 .option("cloudFiles.schemaHints", "quantity BIGINT")
 .option("cloudFiles.schemaHints", "purchase_order_key STRING")
 .option("cloudFiles.schemaHints", "order_key STRING") 
 .option("cloudFiles.schemaLocation", inventory_trans_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(inventory_trans_stream_dir)
 .createOrReplaceTempView("inventory_transactions_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW inventory_transactions_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM inventory_transactions_raw_tempview
)

In [0]:
%sql
SELECT * FROM inventory_transactions_raw_tempview

In [0]:
(spark.table("inventory_transactions_bronze_tempview")
     .writeStream
     .format("delta")
     .option("checkpointLocation", f"{inventory_trans_output_bronze}/_checkpoint")
     .outputMode("append")
     .table("fact_inventory_transactions_bronze"))

##### 8.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_inventory_transactions_bronze")
  .createOrReplaceTempView("inventory_transactions_silver_tempview"))

In [0]:
%sql
SELECT * FROM inventory_transactions_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED inventory_transactions_silver_tempview

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_inventory_transactions_silver_tempview AS (
  SELECT it.fact_inventory_transaction_key
      , it.product_key
      , p.product_name
      , it.transaction_created_date_key
      , cd.day_name_of_week AS created_day_name_of_week
      , cd.day_of_month AS created_day_of_month
      , cd.weekday_weekend AS created_weekday_weekend
      , cd.month_name AS created_month_name
      , cd.calendar_quarter AS created_quarter
      , cd.calendar_year AS created_year
      , it.transaction_modified_date_key
      , md.day_name_of_week AS modified_day_name_of_week
      , md.day_of_month AS modified_day_of_month
      , md.weekday_weekend AS modified_weekday_weekend
      , md.month_name AS modified_month_name
      , md.calendar_quarter AS modified_quarter
      , md.calendar_year AS modified_year
      , it.inventory_transaction_type
      , it.quantity
  FROM inventory_transactions_silver_tempview AS it
  LEFT OUTER JOIN northwind_dlh.dim_product AS p
  ON p.product_key = it.product_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS md
  ON md.date_key = it.transaction_modified_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS cd
  ON cd.date_key = it.transaction_created_date_key
)

In [0]:
(spark.table("fact_inventory_transactions_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{inventory_trans_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_inventory_transactions_silver"))

In [0]:
%sql
SELECT * FROM fact_inventory_transactions_silver

In [0]:
%sql
DESCRIBE EXTENDED fact_inventory_transactions_silver

##### 8.3. Gold Table: Perform Aggregations

In [0]:
%sql
CREATE OR REPLACE TABLE northwind_dlh.fact_inventory_transactions_gold AS (
  SELECT created_quarter AS QuarterCreated
      , inventory_transaction_type AS Transaction
      , product_name AS Product
      , SUM(quantity) AS TotalQuantity
  FROM northwind_dlh.fact_inventory_transactions_silver
  GROUP BY QuarterCreated, Transaction, Product
  ORDER BY TotalQuantity DESC);
  
SELECT * FROM northwind_dlh.fact_inventory_transactions_gold

#### 9.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/ds2002-lab06/