## Lab 06: Data Lakehouse with Structured Streaming
This lab will help you learn to use many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-session capstone project for course **DS-2002: Data Systems**. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
%python
%pip install pymongo

import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

Collecting pymongo
  Obtaining dependency information for pymongo from https://files.pythonhosted.org/packages/9a/16/dbffca9d4ad66f2a325c280f1177912fa23235987f7b9033e283da889b7a/pymongo-4.10.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading pymongo-4.10.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Obtaining dependency information for dnspython<3.0.0,>=1.16.0 from https://files.pythonhosted.org/packages/68/1b/e0a87d256e40e8c888847551b20a017a6b98139178505dc7ffb96f04e954/dnspython-2.7.0-py3-none-any.whl.metadata
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading pymongo-4.10.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.7 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.7 MB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.3/1.7 MB[0m

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "wna8fw-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "northwind_dw2"

connection_properties = {
  "user" : "root",
  "password" : "Luckylemons12$",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Cluster0"
atlas_database_name = "northwind_dw2"
atlas_user_name = "jsq4xr"
atlas_password = "5DltCXGlClkT1XtZ"

# Data Files (JSON) Information ###############################
dst_database = "northwind_dlh"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"
purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = "mongodb+srv://jsq4xr:5DltCXGlClkT1XtZ@cluster0.laoqh.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS northwind_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS northwind_dlh
COMMENT "DS-2002 Lab 06 Database"
LOCATION "dbfs:/FileStore/lab_data/northwind_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Lab 6.0");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wna8fw-mysql.mysql.database.azure.com:3306/northwind_dw2", --Replace with your Server Name
  dbtable "dim_date",
  user "jtupitza",    --Replace with your User Name
  password "Passw0rd123"  --Replace with you password
)

In [0]:
%sql
USE DATABASE northwind_dlh;

CREATE OR REPLACE TABLE northwind_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/northwind_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM northwind_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wna8fw-mysql.mysql.database.azure.com:3306/northwind_dw2",
  dbtable "dim_products", 
  user "jtupitza",    --Replace with your User Name
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE northwind_dlh;

-- Create a new table named "northwind_dlh.dim_product" using data from the view named "view_product"
CREATE OR REPLACE TABLE northwind_dlh.dim_product 
COMMENT "Product Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/northwind_dlh/dim_product"
AS SELECT * FROM view_product

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_product;

col_name,data_type,comment
product_key,bigint,
product_id,bigint,
product_code,varchar(65535),
product_name,varchar(65535),
standard_cost,double,
list_price,double,
reorder_level,bigint,
target_level,bigint,
quantity_per_unit,varchar(65535),
discontinued,bigint,


In [0]:
%sql
SELECT * FROM northwind_dlh.dim_product LIMIT 5

product_key,product_id,product_code,product_name,standard_cost,list_price,reorder_level,target_level,quantity_per_unit,discontinued,minimum_reorder_quantity,category
1,1,NWTB-1,Northwind Traders Chai,13.5,18.0,10,40,10 boxes x 20 bags,0,10.0,Beverages
2,3,NWTCO-3,Northwind Traders Syrup,7.5,10.0,25,100,12 - 550 ml bottles,0,25.0,Condiments
3,4,NWTCO-4,Northwind Traders Cajun Seasoning,16.5,22.0,10,40,48 - 6 oz jars,0,10.0,Condiments
4,5,NWTO-5,Northwind Traders Olive Oil,16.0125,21.35,10,40,36 boxes,0,10.0,Oil
5,6,NWTJP-6,Northwind Traders Boysenberry Spread,18.75,25.0,25,100,12 - 8 oz jars,0,25.0,"Jams, Preserves"


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data/retail/batch'

path,name,size,modificationTime
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimCustomers-1.json,Northwind_DimCustomers-1.json,10476,1732738833000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimCustomers-2.json,Northwind_DimCustomers-2.json,10476,1732738909000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimCustomers.json,Northwind_DimCustomers.json,10476,1732729125000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimEmployees-1.csv,Northwind_DimEmployees-1.csv,2164,1732738833000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimEmployees-2.csv,Northwind_DimEmployees-2.csv,2164,1732738909000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimEmployees.csv,Northwind_DimEmployees.csv,2164,1732729125000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimInvoices-1.json,Northwind_DimInvoices-1.json,6263,1732738833000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimInvoices-2.json,Northwind_DimInvoices-2.json,6263,1732738909000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimInvoices.json,Northwind_DimInvoices.json,6263,1732729125000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimShippers-1.csv,Northwind_DimShippers-1.csv,262,1732738833000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
uri = "mongodb+srv://jsq4xr:5DltCXGlClkT1XtZ@cluster0.laoqh.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))
# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


In [0]:
source_dir = '/dbfs/FileStore/lab_data/retail/batch'
json_files = {"customers" : 'Northwind_DimCustomers.json'
              , "suppliers" : 'Northwind_DimSuppliers.json'
              , "invoices" : 'Northwind_DimInvoices.json'}

set_mongo_collection(atlas_user_name, atlas_password, "Cluster0", atlas_database_name, source_dir, json_files) 

InsertManyResult([ObjectId('674798b10240b2ac269af7dc'), ObjectId('674798b10240b2ac269af7dd'), ObjectId('674798b10240b2ac269af7de'), ObjectId('674798b10240b2ac269af7df'), ObjectId('674798b10240b2ac269af7e0'), ObjectId('674798b10240b2ac269af7e1'), ObjectId('674798b10240b2ac269af7e2'), ObjectId('674798b10240b2ac269af7e3'), ObjectId('674798b10240b2ac269af7e4'), ObjectId('674798b10240b2ac269af7e5'), ObjectId('674798b10240b2ac269af7e6'), ObjectId('674798b10240b2ac269af7e7'), ObjectId('674798b10240b2ac269af7e8'), ObjectId('674798b10240b2ac269af7e9'), ObjectId('674798b10240b2ac269af7ea'), ObjectId('674798b10240b2ac269af7eb'), ObjectId('674798b10240b2ac269af7ec'), ObjectId('674798b10240b2ac269af7ed'), ObjectId('674798b10240b2ac269af7ee'), ObjectId('674798b10240b2ac269af7ef'), ObjectId('674798b10240b2ac269af7f0'), ObjectId('674798b10240b2ac269af7f1'), ObjectId('674798b10240b2ac269af7f2'), ObjectId('674798b10240b2ac269af7f3'), ObjectId('674798b10240b2ac269af7f4'), ObjectId('674798b10240b2ac269af7

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
print(spark.version)

3.3.0


In [0]:
from pyspark.sql import SparkSession

# MongoDB URI and other configurations
mongo_uri = "mongodb+srv://jsq4xr:5DltCXGlClkT1XtZ@cluster0.laoqh.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

# Initialize Spark session with MongoDB configurations
spark = SparkSession.builder \
    .appName("MongoDBSparkConnectorExample") \
    .config("spark.mongodb.input.uri", mongo_uri) \
    .config("spark.mongodb.output.uri", mongo_uri) \
    .getOrCreate()

# Read data from MongoDB (replace 'your_database.your_collection' with actual values)
df = spark.read.format("mongo").option("uri", mongo_uri).load()

# Show the data to confirm the connection is working
df.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
[0;32m<command-924093559460666>[0m in [0;36m<cell line: 14>[0;34m()[0m
[1;32m     12[0m [0;34m[0m[0m
[1;32m     13[0m [0;31m# Read data from MongoDB (replace 'your_database.your_collection' with actual values)[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 14[0;31m [0mdf[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"mongo"[0m[0;34m)[0m[0;34m.[0m[0moption[0m[0;34m([0m[0;34m"uri"[0m[0;34m,[0m [0mmongo_uri[0m[0;34m)[0m[0;34m.[0m[0mload[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     15[0m [0;34m[0m[0m
[1;32m     16[0m [0;31m# Show the data to confirm the connection is working[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, *

In [0]:
%scala
import com.mongodb.spark.sql._

val userName = "jsq4xr"
val pwd = "5DltCXGlClkT1XtZ"
val clusterName = "Cluster0"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "northwind_dw2")
.option("collection", "customers").load()
.select("customer_key","company","last_name","first_name","job_title","business_phone","fax_number","address","city","state_province","zip_postal_code","country_region")

display(df_customer)

In [0]:
%scala
df_customer.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_customer

org.apache.spark.sql.catalyst.ExtendedAnalysisException: [DELTA_PATH_DOES_NOT_EXIST] dbfs:/FileStore/lab_data/northwind_dlh/dim_customer doesn't exist, or is not a Delta table.;
DescribeRelation true, [col_name#8339, data_type#8340, comment#8341]
+- ResolvedTable com.databricks.sql.managedcatalog.UnityCatalogV2Proxy@63652ae6, northwind_dlh.dim_customer, DeltaTableV2(org.apache.spark.sql.SparkSession@7298c35,dbfs:/FileStore/lab_data/northwind_dlh/dim_customer,Some(CatalogTable(
Catalog: hive_metastore
Database: northwind_dlh
Table: dim_customer
Owner: root
Created Time: Wed Nov 27 22:03:38 UTC 2024
Last Access: UNKNOWN
Created By: Spark 3.5.0
Type: MANAGED
Provider: delta
Table Properties: [delta.enableDeletionVectors=true, delta.feature.deletionVectors=supported, delta.lastCommitTimestamp=1732745017000, delta.lastUpdateVersion=0, delta.minReaderVersion=3, delta.minWriterVersion=7]
Location: dbfs:/FileStore/lab_data/northwind_dlh/dim_customer
Serde Library: org.apache.hadoop.hive.serde2

In [0]:
%sql
SELECT * FROM northwind_dlh.dim_customer LIMIT 5

##### 2.4.1 Fetch Supplier Dimension Data from the New MongoDB Collection

In [0]:
%scala
val df_supplier = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "northwind_dw2")
.option("collection", "suppliers").load()
.select("supplier_key","company","contact_name","contact_title","address","city","state_province","zip_postal_code","country_region")

df_supplier.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_supplier")

In [0]:
%scala
df_supplier.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Suppliers Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
shipper_csv = f"{batch_dir}/Northwind_DimShippers.csv"

df_shipper = spark.read.format('csv').options(header='true', inferSchema='true').load(shipper_csv)
df_shipper.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_shipper")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_supplier

In [0]:
%sql
SELECT * FROM northwind_dlh.dim_supplier LIMIT 5

##### 2.5.1 Fetch Invoice Dimension Data from teh New MongoDB Collection

In [0]:
%scala
val df_invoice = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "northwind_dw2")
.option("collection", "invoices").load()
.select("invoice_key",
        "invoice_date_key",
        "invoice_number", 
        "due_date_key",
        "payment_date_key",
        "order_key",
        "customer_key",
        "shipper_key",
        "product_key",
        "shipping_fee",
        "taxes",
        "payment_type",
        "paid_date_key",
        "notes",
        "tax_rate",
        "freight",
        "order_date_key")

display(df_invoice)

In [0]:
%scala
df_invoice.printSchema()

##### 2.5.2. Use the Spark DataFrame to Create a New Invoices Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
val df_invoice = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "northwind_dw2")
.option("collection", "invoices").load()
.select("invoice_key",
        "invoice_date_key",
        "invoice_number", 
        "due_date_key",
        "payment_date_key",
        "order_key",
        "customer_key",
        "shipper_key",
        "product_key",
        "shipping_fee",
        "taxes",
        "payment_type",
        "paid_date_key",
        "notes",
        "tax_rate",
        "freight",
        "order_date_key")

df_invoice.write.format("delta")
  .mode("overwrite")
  .saveAsTable("northwind_dlh.dim_invoice")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_invoice

In [0]:
%sql
SELECT * FROM northwind_dlh.dim_invoice LIMIT 5

#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
employee_csv = f"{batch_dir}/Northwind_DimEmployees.csv"

df_employee = spark.read.format('csv').options(header='true', inferSchema='true').load(employee_csv)
display(df_employee)

In [0]:
df_employee.printSchema()

In [0]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("northwind_dlh.dim_employee")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_employee;

In [0]:
%sql
SELECT * FROM northwind_dlh.dim_employee LIMIT 5;

##### 3.2 Use PySpark to Read Shipper Dimension Data from CSV File

In [0]:
shipper_csv = f"{batch_dir}/Northwind_DimShippers.csv"

df_shipper = spark.read.format('csv') \
    .options(header='true', inferSchema='true') \
    .load(shipper_csv)

In [0]:
df_shipper.printSchema()

In [0]:
df_shipper.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("northwind_dlh.dim_shipper")

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_shipper;

In [0]:
%sql
SELECT * FROM northwind_dlh.dim_shipper LIMIT 5;

##### Verify Dimension Tables

In [0]:
%sql
USE northwind_dlh;
SHOW TABLES

### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 #.option("cloudFiles.schemaHints", "order_key BIGINT")
 #.option("cloudFiles.schemaHints", "employee_key BIGINT")
 #.option("cloudFiles.schemaHints", "customer_key BIGINT") 
 #.option("cloudFiles.schemaHints", "product_key BIGINT")
 #.option("cloudFiles.schemaHints", "shipper_key DECIMAL")
 #.option("cloudFiles.schemaHints", "order_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "paid_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 #.option("cloudFiles.schemaHints", "quantity DECIMAL")
 #.option("cloudFiles.schemaHints", "unit_price DECIMAL")
 #.option("cloudFiles.schemaHints", "discount DECIMAL")
 #.option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 #.option("cloudFiles.schemaHints", "taxes DECIMAL")
 #.option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 #.option("cloudFiles.schemaHints", "payment_type STRING")
 #.option("cloudFiles.schemaHints", "order_status STRING")
 #.option("cloudFiles.schemaHints", "order_details_status STRING")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.fact_order_key,
      o.order_key,
      o.employee_key,
      e.last_name AS employee_last_name,
      e.first_name AS employee_first_name,
      e.job_title AS employee_job_title,
      e.company AS employee_company,
      o.customer_key,
      c.last_name AS customer_last_name,
      c.first_name AS customer_first_name,
      o.product_key,
      p.product_code,
      p.product_name,
      p.standard_cost AS product_standard_cost,
      p.list_price AS product_list_price,
      p.category AS product_category,
      o.shipper_key,
      s.company AS shipper_company,
      s.state_province AS shipper_state_province,
      s.country_region AS shipper_country_region,
      o.order_date_key,
      od.day_name_of_week AS order_day_name_of_week,
      od.day_of_month AS order_day_of_month,
      od.weekday_weekend AS order_weekday_weekend,
      od.month_name AS order_month_name,
      od.calendar_quarter AS order_quarter,
      od.calendar_year AS order_year,
      o.paid_date_key,
      pd.day_name_of_week AS paid_day_name_of_week,
      pd.day_of_month AS paid_day_of_month,
      pd.weekday_weekend AS paid_weekday_weekend,
      pd.month_name AS paid_month_name,
      pd.calendar_quarter AS paid_calendar_quarter,
      pd.calendar_year AS paid_calendar_year,
      o.shipped_date_key,
      sd.day_name_of_week AS shipped_day_name_of_week,
      sd.day_of_month AS shipped_day_of_month,
      sd.weekday_weekend AS shipped_weekday_weekend,
      sd.month_name AS shipped_month_name,
      sd.calendar_quarter AS shipped_calendar_quarter,
      sd.calendar_year AS shipped_calendar_year,
      o.quantity,
      o.unit_price,
      o.discount,
      o.shipping_fee,
      o.taxes,
      o.tax_rate,
      o.payment_type,
      o.order_status,
      o.order_details_status
  FROM orders_silver_tempview AS o
  INNER JOIN northwind_dlh.dim_employee AS e
  ON e.employee_key = o.employee_key
  INNER JOIN northwind_dlh.dim_customer AS c
  ON c.customer_key = o.customer_key
  INNER JOIN northwind_dlh.dim_product AS p
  ON p.product_key = o.product_key
  INNER JOIN northwind_dlh.dim_shipper AS s
  ON s.shipper_key = o.shipper_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS od
  ON od.date_key = o.order_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS pd
  ON pd.date_key = o.paid_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS sd
  ON sd.date_key = o.shipped_date_key
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

In [0]:
%sql
SELECT * FROM fact_orders_silver

In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.fact_orders_silver

##### 6.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach. The table should include the number of products sold per customer each Month, along with the Customers' ID, First & Last Name, and the Month in which the order was placed.

In [0]:
%sql
CREATE OR REPLACE TABLE northwind_dlh.fact_monthly_orders_by_customer_gold AS (
  SELECT customer_key AS CustomerID
    , customer_last_name AS LastName
    , customer_first_name AS FirstName
    , order_month_name AS OrderMonth
    , COUNT(product_key) AS ProductCount
  FROM northwind_dlh.fact_orders_silver
  GROUP BY CustomerID, LastName, FirstName, OrderMonth
  ORDER BY ProductCount DESC);

SELECT * FROM northwind_dlh.fact_monthly_orders_by_customer_gold;

In [0]:
%sql
CREATE OR REPLACE TABLE northwind_dlh.fact_product_orders_by_customer_gold AS (
  SELECT pc.CustomerID
    , os.customer_last_name AS CustomerName
    , os.product_key AS ProductNumber
    , pc.ProductCount
  FROM northwind_dlh.fact_orders_silver AS os
  INNER JOIN (
    SELECT customer_key AS CustomerID
    , COUNT(product_key) AS ProductCount
    FROM northwind_dlh.fact_orders_silver
    GROUP BY customer_key
  ) AS pc
  ON pc.CustomerID = os.customer_key
  ORDER BY ProductCount DESC);

SELECT * FROM northwind_dlh.fact_product_orders_by_customer_gold;

#### 7.0. Use AutoLoader to Process Streaming (Hot Path) Purchase Orders Fact Data 
##### 7.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
# Use spark.readStream and the AutoLoader to read in the JSON files in the "purchase_orders_stream_dir"
# directory and then create a TempView named "purchase_orders_raw_tempview".
# Be sure to set the "cloudFiles.schemaLocation" Option using the "purchase_orders_output_bronze" directory
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", purchase_orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(purchase_orders_stream_dir)
 .createOrReplaceTempView("purchase_orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */

In [0]:
%sql
SELECT * FROM purchase_orders_bronze_tempview

In [0]:
(spark.table("purchase_orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{purchase_orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_purchase_orders_bronze"))

##### 7.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_purchase_orders_bronze")
  .createOrReplaceTempView("purchase_orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM purchase_orders_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED purchase_orders_silver_tempview

In [0]:
%sql
-- Create a new Temporary View named "purchase_orders_silver_tempview" by selecting data from
-- "purchase_orders_silver_tempview" and joining it to the Supplier, Product, Employee and Date dimension tables.
-- Remember that the Date dimension can serve as a "Role Playing" dimension by being Joined upon multiple times.
CREATE OR REPLACE TEMPORARY VIEW fact_purchase_orders_silver_tempview AS (
  SELECT po.purchase_order_key,
      po.employee_key,
      e.last_name AS employee_last_name,
      e.first_name AS employee_first_name,
      po.supplier_key,
      s.company AS supplier_company,
      po.product_key,
      p.product_name,
      p.standard_cost,
      p.list_price,
      po.submitted_date_key,
      sd.month_name AS submitted_month,
      sd.calendar_quarter AS submitted_quarter,
      po.creation_date_key,
      cd.month_name AS creation_month,
      cd.calendar_quarter AS creation_quarter,
      po.status_key,
      po.expected_date_key,
      po.shipping_fee,
      po.taxes,
      po.payment_date_key,
      po.payment_amount,
      po.payment_method,
      po.notes
  FROM purchase_orders_silver_tempview AS po
  INNER JOIN northwind_dlh.dim_employee AS e
  ON e.employee_key = po.employee_key
  INNER JOIN northwind_dlh.dim_supplier AS s 
  ON s.supplier_key = po.supplier_key
  INNER JOIN northwind_dlh.dim_product AS p
  ON p.product_key = po.product_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS sd
  ON sd.date_key = po.submitted_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS cd
  ON cd.date_key = po.creation_date_key
)

In [0]:
(spark.table("fact_purchase_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{purchase_orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_purchase_orders_silver"))

In [0]:
%sql
SELECT * FROM fact_purchase_orders_silver

In [0]:
%sql
DESCRIBE EXTENDED fact_purchase_orders_silver

##### 7.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach. The table should include the total amount (total list price) of the purchase orders placed per Supplier for each Product. Include the Suppliers' Company Name, and the Product Name.

In [0]:
%sql
-- Author a query that returns the Total List Price grouped by Supplier and Product and sorted by Total List Price descending.
CREATE OR REPLACE TABLE northwind_dlh.fact_inventory_transaction_summary_gold AS (
  SELECT created_quarter AS Quarter,
         transaction_type AS TransactionType,
         product_name AS Product,
         SUM(quantity) AS TotalQuantity
  FROM northwind_dlh.fact_inventory_transactions_silver
  GROUP BY Quarter, TransactionType, Product
  ORDER BY TotalQuantity DESC
);

#### 8.0. Use AutoLoader to Process Streaming (Hot Path) Inventory Transactions Fact Data 
##### 8.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
# Use spark.readStream and the AutoLoader to read in the JSON files in the "inventory_trans_stream_dir"
# directory and then create a TempView named "inventory_transactions_raw_tempview".
# Be sure to set the "cloudFiles.schemaLocation" Option using the "inventory_trans_output_bronze" directory
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", inventory_trans_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(inventory_trans_stream_dir)
 .createOrReplaceTempView("inventory_transactions_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */

In [0]:
%sql
SELECT * FROM inventory_transactions_raw_tempview

In [0]:
(spark.table("inventory_transactions_bronze_tempview")
     .writeStream
     .format("delta")
     .option("checkpointLocation", f"{inventory_trans_output_bronze}/_checkpoint")
     .outputMode("append")
     .table("fact_inventory_transactions_bronze"))

##### 8.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_inventory_transactions_bronze")
  .createOrReplaceTempView("inventory_transactions_silver_tempview"))

In [0]:
%sql
SELECT * FROM inventory_transactions_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED inventory_transactions_silver_tempview

In [0]:
%sql
-- Create a new Temporary View named "fact_inventory_transactions_silver_tempview" by selecting data from
-- "inventory_transactions_silver_tempview" and joining it to the Product and Data dimension tables.
-- Remember that the Date dimension can serve as a "Role Playing" dimension by being Joined upon multiple times.
CREATE OR REPLACE TEMPORARY VIEW fact_inventory_transactions_silver_tempview AS (
  SELECT 
      it.inventory_transaction_key,
      it.transaction_type,
      it.transaction_created_date_key,
      it.transaction_modified_date_key,
      it.product_key,
      it.quantity,
      it.comments,
      
      -- Product dimension attributes
      p.product_code,
      p.product_name,
      p.description,
      p.supplier_company,
      p.standard_cost,
      p.list_price,
      p.reorder_level,
      p.target_level,
      p.quantity_per_unit,
      p.discontinued,
      p.minimum_reorder_quantity,
      p.category,
      
      -- Created Date dimension attributes (role-playing)
      cd.date_key as created_date_key,
      cd.full_date_alternate_key as created_full_date,
      cd.day_number_of_week as created_day_number_of_week,
      cd.day_name_of_week as created_day_name_of_week,
      cd.day_number_of_month as created_day_of_month,
      cd.day_number_of_year as created_day_of_year,
      cd.weekday_weekend as created_weekday_weekend,
      cd.week_number_of_year as created_week_of_year,
      cd.month_name as created_month_name,
      cd.month_number_of_year as created_month_number,
      cd.calendar_quarter as created_quarter,
      cd.calendar_year as created_year,
      cd.calendar_semester as created_semester,
      
      -- Modified Date dimension attributes (role-playing)
      md.date_key as modified_date_key,
      md.full_date_alternate_key as modified_full_date,
      md.day_number_of_week as modified_day_number_of_week,
      md.day_name_of_week as modified_day_name_of_week,
      md.day_number_of_month as modified_day_of_month,
      md.day_number_of_year as modified_day_of_year,
      md.weekday_weekend as modified_weekday_weekend,
      md.week_number_of_year as modified_week_of_year,
      md.month_name as modified_month_name,
      md.month_number_of_year as modified_month_number,
      md.calendar_quarter as modified_quarter,
      md.calendar_year as modified_year,
      md.calendar_semester as modified_semester
      
  FROM inventory_transactions_silver_tempview it
  
  INNER JOIN northwind_dlh.dim_product p
  ON it.product_key = p.product_key
  
  LEFT OUTER JOIN northwind_dlh.dim_date cd
  ON it.transaction_created_date_key = cd.date_key
  
  LEFT OUTER JOIN northwind_dlh.dim_date md
  ON it.transaction_modified_date_key = md.date_key
)

In [0]:
(spark.table("fact_inventory_transactions_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{inventory_trans_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_inventory_transactions_silver"))

In [0]:
%sql
SELECT * FROM fact_inventory_transactions_silver

In [0]:
%sql
DESCRIBE EXTENDED fact_inventory_transactions_silver

##### 8.3. Gold Table: Perform Aggregations

In [0]:
%sql
-- Author a query that returns the Total Quantity grouped by the Quarter Created, Inventory Transaction Type, and Product
-- Sort by the Total Quantity Descending
CREATE OR REPLACE TABLE northwind_dlh.fact_inventory_transaction_summary_gold AS (
  SELECT created_quarter AS Quarter,
         transaction_type AS TransactionType,
         product_name AS Product,
         SUM(quantity) AS TotalQuantity
  FROM northwind_dlh.fact_inventory_transactions_silver
  GROUP BY Quarter, TransactionType, Product
  ORDER BY TotalQuantity DESC
);

#### 9.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/lab_data/