## DS 2002 Data Project 2 - Kristy Luk

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "uec6ct-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "adventureworks_dw"

connection_properties = {
  "user" : "kristyluk",
  "password" : "Kwl1014810473!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.y4eygtf"
atlas_database_name = "adventureworks_dw"
atlas_user_name = "uec6ct"
atlas_password = "Passw0rd1234"

# Data Files (JSON) Information ###############################
dst_database = "adventureworks_dlh"

base_dir = "dbfs:/FileStore/project_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

output_bronze = f"{database_dir}/fact_orders/bronze"
output_silver = f"{database_dir}/fact_orders/silver"
output_gold   = f"{database_dir}/fact_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### 3.0. Define Global Functions

In [0]:

######################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
######################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcUrl = f"jdbc:sqlserver://{host_name}:{port};database={db_name}"
    
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe

##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS adventureworks_dlh CASCADE;

In [0]:
%
# Creating new folder where data will go
CREATE DATABASE IF NOT EXISTS adventureworks_dlh
COMMENT "DS-2002 Capstone Project Database"
LOCATION "dbfs:/FileStore/project_data/adventureworks_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone Project");

##### 1.2. Create a New Table that Sources Product Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_products
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://uec6ct-mysql.mysql.database.azure.com:3306/adventureworks_dw?useSSL=true&requireSSL=true&verifyServerCertificate=false",
  dbtable "dim_products",
  user "kristyluk",
  password "Kwl1014810473!"
)

In [0]:
%sql
-- Ensure the database exists create if it does not 

CREATE DATABASE IF NOT EXISTS adventureworks_dlh;

USE adventureworks_dlh;

-- Create or replace the table within the specified database
CREATE OR REPLACE TABLE adventureworks_dlh.dim_products
COMMENT "Products Dimension Table"
LOCATION "dbfs:/FileStore/project_data/adventureworks_dlh/dim_products"
AS SELECT * FROM view_products;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED adventureworks_dlh.dim_products;

col_name,data_type,comment
ProductKey,int,
ProductID,int,
Name,varchar(50),
ProductNumber,varchar(25),
StandardCost,double,
ListPrice,double,
Size,varchar(5),
Weight,"decimal(8,2)",
DaysToManufacture,int,
SellStartDate,timestamp,


In [0]:
%sql
SELECT * FROM adventureworks_dlh.dim_products LIMIT 5

ProductKey,ProductID,Name,ProductNumber,StandardCost,ListPrice,Size,Weight,DaysToManufacture,SellStartDate,SellEndDate
30,1,Adjustable Race,AR-5381,0.0,0.0,,,0,1998-06-01T00:00:00Z,
31,2,Bearing Ball,BA-8327,0.0,0.0,,,0,1998-06-01T00:00:00Z,
32,3,BB Ball Bearing,BE-2349,0.0,0.0,,,1,1998-06-01T00:00:00Z,
33,4,Headset Ball Bearings,BE-2908,0.0,0.0,,,0,1998-06-01T00:00:00Z,
34,316,Blade,BL-2036,0.0,0.0,,,1,1998-06-01T00:00:00Z,


##### 1.3. Create a New Table that Sources Date Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_date" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://uec6ct-mysql.mysql.database.azure.com:3306/adventureworks_dw?useSSL=true&requireSSL=true", -- Enabled SSL 
  dbtable "dim_date",
  user "kristyluk",    
  password "Kwl1014810473!"  
)

In [0]:
%sql
-- Create a new table named "adventureworks_dlh.dim_date" using data from the view named "view_date"
USE DATABASE adventureworks_dlh;

CREATE OR REPLACE TABLE adventureworks_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/project_data/adventureworks_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED adventureworks_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM adventureworks_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
19980101,1998-01-01,1998/01/01,01/01/1998,01/01/1998,5,Thursday,1,1,Weekday,1,January,1,N,1,1998,1998-01,1998Q1,7,3,1998,1998-07,1998Q3
19980102,1998-01-02,1998/01/02,01/02/1998,02/01/1998,6,Friday,2,2,Weekday,1,January,1,N,1,1998,1998-01,1998Q1,7,3,1998,1998-07,1998Q3
19980103,1998-01-03,1998/01/03,01/03/1998,03/01/1998,7,Saturday,3,3,Weekend,1,January,1,N,1,1998,1998-01,1998Q1,7,3,1998,1998-07,1998Q3
19980104,1998-01-04,1998/01/04,01/04/1998,04/01/1998,1,Sunday,4,4,Weekend,1,January,1,N,1,1998,1998-01,1998Q1,7,3,1998,1998-07,1998Q3
19980105,1998-01-05,1998/01/05,01/05/1998,05/01/1998,2,Monday,5,5,Weekday,2,January,1,N,1,1998,1998-01,1998Q1,7,3,1998,1998-07,1998Q3


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
# Define the directory path
batch_dir = "dbfs:/FileStore/project_data/source_data/batch"

# Check if the directory exists
try:
    dbutils.fs.ls(batch_dir)
    print(f"Directory exists: {batch_dir}")
except Exception as e:
    # Directory does not exist, so create it
    print(f"Directory does not exist. Creating directory: {batch_dir}")
    dbutils.fs.mkdirs(batch_dir)

Directory exists: dbfs:/FileStore/project_data/source_data/batch


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/project_data/source_data/batch'
json_files = {"purchase" : 'purchase.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7ffac194fd80>

##### 2.3.1. Fetch Purchase Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "uec6ct"
val pwd = "Passw0rd1234"
val clusterName = "cluster0.y4eygtf"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_purchase = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "adventureworks_dw")
.option("collection", "purchase").load()
.select("PurchaseOrderDetailID","DueDate","OrderQty","ProductID","UnitPrice","ReceivedQty","RejectedQty","StockedQty","ModifiedDate")

display(df_purchase)

PurchaseOrderDetailID,DueDate,OrderQty,ProductID,UnitPrice,ReceivedQty,RejectedQty,StockedQty,ModifiedDate
1,2001-05-31 00:00:00,4,1,50.26,3.0,0.0,3.0,2001-05-24 00:00:00
2,2001-05-31 00:00:00,3,359,45.12,3.0,0.0,3.0,2001-05-24 00:00:00
3,2001-05-31 00:00:00,3,360,45.5805,3.0,0.0,3.0,2001-05-24 00:00:00
4,2001-05-31 00:00:00,550,530,16.086,550.0,0.0,550.0,2001-05-24 00:00:00
5,2001-05-31 00:00:00,3,4,57.0255,2.0,1.0,1.0,2001-05-24 00:00:00
6,2001-06-14 00:00:00,550,512,37.086,550.0,0.0,550.0,2001-06-07 00:00:00
7,2001-06-14 00:00:00,550,513,26.5965,468.0,0.0,468.0,2001-06-07 00:00:00
8,2001-06-14 00:00:00,550,317,27.0585,550.0,0.0,550.0,2001-06-07 00:00:00
9,2001-06-14 00:00:00,550,318,33.579,550.0,0.0,550.0,2001-06-07 00:00:00
10,2001-06-14 00:00:00,550,319,46.0635,550.0,0.0,550.0,2001-06-07 00:00:00


In [0]:
%scala
df_purchase.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Purchase Dimension Table in the Databricks Metadata Database (adventureworks_dlh)

In [0]:
%scala
df_purchase.write.format("delta").mode("overwrite").saveAsTable("adventureworks_dlh.dim_purchase")

In [0]:
%sql
DESCRIBE EXTENDED adventureworks_dlh.dim_purchase

col_name,data_type,comment
PurchaseOrderDetailID,int,
DueDate,string,
OrderQty,int,
ProductID,int,
UnitPrice,double,
ReceivedQty,double,
RejectedQty,double,
StockedQty,double,
ModifiedDate,string,
,,


In [0]:
%sql
SELECT * FROM adventureworks_dlh.dim_purchase LIMIT 5

PurchaseOrderDetailID,DueDate,OrderQty,ProductID,UnitPrice,ReceivedQty,RejectedQty,StockedQty,ModifiedDate
1,2001-05-31 00:00:00,4,1,50.26,3.0,0.0,3.0,2001-05-24 00:00:00
2,2001-05-31 00:00:00,3,359,45.12,3.0,0.0,3.0,2001-05-24 00:00:00
3,2001-05-31 00:00:00,3,360,45.5805,3.0,0.0,3.0,2001-05-24 00:00:00
4,2001-05-31 00:00:00,550,530,16.086,550.0,0.0,550.0,2001-05-24 00:00:00
5,2001-05-31 00:00:00,3,4,57.0255,2.0,1.0,1.0,2001-05-24 00:00:00


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
sales_order_detail_csv = f"{batch_dir}/sales_order_detail.csv"

df_sales_order_detail = spark.read.format('csv').options(header='true', inferSchema='true').load(sales_order_detail_csv)
display(df_sales_order_detail)

SalesOrderDetailID,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,LineTotal,ModifiedDate
1,4911-403C-98,1,776,1,2024.994,2024.994,2001-07-01T00:00:00Z
2,4911-403C-98,3,777,1,2024.994,6074.982,2001-07-01T00:00:00Z
3,4911-403C-98,1,778,1,2024.994,2024.994,2001-07-01T00:00:00Z
4,4911-403C-98,1,771,1,2039.994,2039.994,2001-07-01T00:00:00Z
5,4911-403C-98,1,772,1,2039.994,2039.994,2001-07-01T00:00:00Z
6,4911-403C-98,2,773,1,2039.994,4079.988,2001-07-01T00:00:00Z
7,4911-403C-98,1,774,1,2039.994,2039.994,2001-07-01T00:00:00Z
8,4911-403C-98,3,714,1,28.8404,86.5212,2001-07-01T00:00:00Z
9,4911-403C-98,1,716,1,28.8404,28.8404,2001-07-01T00:00:00Z
10,4911-403C-98,6,709,1,5.7,34.2,2001-07-01T00:00:00Z


In [0]:
df_sales_order_detail.printSchema()

root
 |-- SalesOrderDetailID: integer (nullable = true)
 |-- CarrierTrackingNumber: string (nullable = true)
 |-- OrderQty: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- SpecialOfferID: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- LineTotal: double (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)



In [0]:
df_sales_order_detail.write.format("delta").mode("overwrite").saveAsTable("adventureworks_dlh.dim_sales_order_detail")

In [0]:
%sql
DESCRIBE EXTENDED adventureworks_dlh.dim_sales_order_detail;

col_name,data_type,comment
SalesOrderDetailID,int,
CarrierTrackingNumber,string,
OrderQty,int,
ProductID,int,
SpecialOfferID,int,
UnitPrice,double,
LineTotal,double,
ModifiedDate,timestamp,
,,
# Delta Statistics Columns,,


In [0]:
%sql
SELECT * FROM adventureworks_dlh.dim_sales_order_detail LIMIT 5;

SalesOrderDetailID,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,LineTotal,ModifiedDate
1,4911-403C-98,1,776,1,2024.994,2024.994,2001-07-01T00:00:00Z
2,4911-403C-98,3,777,1,2024.994,6074.982,2001-07-01T00:00:00Z
3,4911-403C-98,1,778,1,2024.994,2024.994,2001-07-01T00:00:00Z
4,4911-403C-98,1,771,1,2039.994,2039.994,2001-07-01T00:00:00Z
5,4911-403C-98,1,772,1,2039.994,2039.994,2001-07-01T00:00:00Z


##### Verify Dimension Tables

In [0]:
%sql
-- Verifying that dim_date, dim_products, dim_purchase, and dim_sales_order_detail is in the adventureworks_dlh database
USE adventureworks_dlh;
SHOW TABLES

database,tableName,isTemporary
adventureworks_dlh,dim_date,False
adventureworks_dlh,dim_products,False
adventureworks_dlh,dim_purchase,False
adventureworks_dlh,dim_sales_order_detail,False
,view_date,True
,view_products,True


### Section III: Integrate Reference Data with Real-Time Data
#### 1.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 1.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
# Define the directory path
stream_dir = "dbfs:/FileStore/project_data/source_data/stream"

# Check if the directory exists
try:
    dbutils.fs.ls(stream_dir)
    print(f"Directory exists: {stream_dir}")
except Exception as e:
    # Directory does not exist, so create it
    print(f"Directory does not exist. Creating directory: {stream_dir}")
    dbutils.fs.mkdirs(stream_dir)

Directory exists: dbfs:/FileStore/project_data/source_data/stream


In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "CarrierTrackingNumber STRING")
 .option("cloudFiles.schemaHints", "DaysToManufacture BIGINT")
 .option("cloudFiles.schemaHints", "DueDate STRING")
 .option("cloudFiles.schemaHints", "LineTotal DOUBLE")
 .option("cloudFiles.schemaHints", "ListPrice DOUBLE")
 .option("cloudFiles.schemaHints", "ModifiedDate STRING")
 .option("cloudFiles.schemaHints", "OrderQty BIGINT")
 .option("cloudFiles.schemaHints", "ProductID BIGINT")
 .option("cloudFiles.schemaHints", "PurchaseOrderDetailID BIGINT")
 .option("cloudFiles.schemaHints", "SalesOrderDetailID BIGINT")
 .option("cloudFiles.schemaHints", "Weight DOUBLE")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
DESCRIBE orders_raw_tempview;


col_name,data_type,comment
AccountNumber,string,
CarrierTrackingNumber,string,
CustomerID,bigint,
CustomerKey,bigint,
CustomerType,string,
DaysToManufacture,bigint,
DueDate,string,
LineTotal,double,
ListPrice,double,
ModifiedDate,string,


In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

AccountNumber,CarrierTrackingNumber,CustomerID,CustomerKey,CustomerType,DaysToManufacture,DueDate,LineTotal,ListPrice,ModifiedDate,Name,OrderQty,ProductID,ProductKey,ProductNumber,PurchaseOrderDetailID,ReceivedQty,RejectedQty,SalesOrderDetailID,SellEndDate,SellStartDate,Size,SpecialOfferID,StandardCost,StockedQty,TerritoryID,UnitPrice,Weight,_rescued_data,receipt_time,source_file
,4911-403C-98,,,,,,2024.994,,2001-07-01 00:00:00,,1,776,,,,,,1,,,,1,,,,2024.994,,,2024-05-11T01:30:54.304Z,dbfs:/FileStore/project_data/source_data/stream/sales_order_detail.json
,4911-403C-98,,,,,,6074.982,,2001-07-01 00:00:00,,3,777,,,,,,2,,,,1,,,,2024.994,,,2024-05-11T01:30:54.304Z,dbfs:/FileStore/project_data/source_data/stream/sales_order_detail.json
,4911-403C-98,,,,,,2024.994,,2001-07-01 00:00:00,,1,778,,,,,,3,,,,1,,,,2024.994,,,2024-05-11T01:30:54.304Z,dbfs:/FileStore/project_data/source_data/stream/sales_order_detail.json
,4911-403C-98,,,,,,2039.994,,2001-07-01 00:00:00,,1,771,,,,,,4,,,,1,,,,2039.994,,,2024-05-11T01:30:54.304Z,dbfs:/FileStore/project_data/source_data/stream/sales_order_detail.json
,4911-403C-98,,,,,,2039.994,,2001-07-01 00:00:00,,1,772,,,,,,5,,,,1,,,,2039.994,,,2024-05-11T01:30:54.304Z,dbfs:/FileStore/project_data/source_data/stream/sales_order_detail.json
,4911-403C-98,,,,,,4079.988,,2001-07-01 00:00:00,,2,773,,,,,,6,,,,1,,,,2039.994,,,2024-05-11T01:30:54.304Z,dbfs:/FileStore/project_data/source_data/stream/sales_order_detail.json
,4911-403C-98,,,,,,2039.994,,2001-07-01 00:00:00,,1,774,,,,,,7,,,,1,,,,2039.994,,,2024-05-11T01:30:54.304Z,dbfs:/FileStore/project_data/source_data/stream/sales_order_detail.json
,4911-403C-98,,,,,,86.5212,,2001-07-01 00:00:00,,3,714,,,,,,8,,,,1,,,,28.8404,,,2024-05-11T01:30:54.304Z,dbfs:/FileStore/project_data/source_data/stream/sales_order_detail.json
,4911-403C-98,,,,,,28.8404,,2001-07-01 00:00:00,,1,716,,,,,,9,,,,1,,,,28.8404,,,2024-05-11T01:30:54.304Z,dbfs:/FileStore/project_data/source_data/stream/sales_order_detail.json
,4911-403C-98,,,,,,34.2,,2001-07-01 00:00:00,,6,709,,,,,,10,,,,1,,,,5.7,,,2024-05-11T01:30:54.304Z,dbfs:/FileStore/project_data/source_data/stream/sales_order_detail.json


In [0]:
# Streaming the raw data to a Delta table called fact_orders_bronze using the append output mode
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7ffabad1cbb0>

##### 1.2. Silver Table: Include Reference Data

In [0]:
# Creating a streaming temporary view named orders_silver_tempview into the fact_orders_bronze table so we can then do transformations using SQL
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

CarrierTrackingNumber,DaysToManufacture,DueDate,LineTotal,ListPrice,ModifiedDate,Name,OrderQty,ProductID,ProductKey,ProductNumber,PurchaseOrderDetailID,ReceivedQty,RejectedQty,SalesOrderDetailID,SellEndDate,SellStartDate,Size,SpecialOfferID,StandardCost,StockedQty,UnitPrice,Weight,_rescued_data,receipt_time,source_file


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
CarrierTrackingNumber,string,
DaysToManufacture,bigint,
DueDate,string,
LineTotal,double,
ListPrice,double,
ModifiedDate,string,
Name,string,
OrderQty,bigint,
ProductID,bigint,
ProductKey,bigint,


In [0]:
%sql
-- Combining dimension tables into a fact table 
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT t.CarrierTrackingNumber,
         t.DaysToManufacture, 
         t.DueDate, 
         t.LineTotal, 
         t.ListPrice,
         t.ModifiedDate,
         t.ProductID,
         p.StandardCost,
         p.ProductNumber, 
         p.SellEndDate, 
         p.SellStartDate,
         t.PurchaseOrderDetailID,
         pu.ReceivedQty,
         pu.RejectedQty,
         pu.StockedQty,
         pu.UnitPrice,
         t.SalesOrderDetailID,
         sod.SpecialOfferID, 
         t.Weight
  FROM orders_silver_tempview AS t
  INNER JOIN adventureworks_dlh.dim_products AS p
  ON t.ProductID = p.ProductID
  INNER JOIN adventureworks_dlh.dim_purchase AS pu
  ON t.PurchaseOrderDetailID = pu.PurchaseOrderDetailID
  INNER JOIN adventureworks_dlh.dim_sales_order_detail AS sod
  ON t.SalesOrderDetailID = sod.SalesOrderDetailID
);

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7ffabadec070>

In [0]:
%sql
SELECT * FROM fact_orders_silver

CarrierTrackingNumber,DaysToManufacture,DueDate,LineTotal,ListPrice,ModifiedDate,Name,OrderQty,ProductID,ProductKey,ProductNumber,PurchaseOrderDetailID,ReceivedQty,RejectedQty,SalesOrderDetailID,SellEndDate,SellStartDate,Size,SpecialOfferID,StandardCost,StockedQty,UnitPrice,Weight,_rescued_data


Databricks data profile. Run in Databricks to view.

In [0]:
%sql
DESCRIBE EXTENDED adventureworks_dlh.fact_orders_silver

col_name,data_type,comment
PurchaseOrderDetailID,int,
CarrierTrackingNumber,string,
ProductID,int,
Size,varchar(5),
Weight,"decimal(8,2)",
DueDate,string,
OrderQty,int,
StandardCost,double,
ListPrice,double,
UnitPrice,double,


##### 1.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach. Selecting from the ProductID column and StandardCost column. Then summing the UnitPrice and creating a new column Total UnitPrice. Grouping by ProductID, StandardCost, and ListPrice and ordering the data in ascending order of the TotalUnitPrice column.

In [0]:
%sql
CREATE OR REPLACE TABLE adventureworks_dlh.fact_orders_gold AS (
  SELECT ProductID, 
    StandardCost,
    SUM(UnitPrice) AS TotalUnitPrice
  FROM adventureworks_dlh.fact_orders_silver
  GROUP BY ProductID, StandardCost, ListPrice
  ORDER BY TotalUnitPrice ASC
);

SELECT * FROM adventureworks_dlh.fact_orders_gold;

ProductID,StandardCost,TotalUnitPrice


Calculating the number of PurchaseOrderDetail for each ProductID. Then, it joins the data witht the existing fact_orders_silver table on the ProductID column. Lastly, it takes the ProductID, StandardCost, and PurchaseOrderDetailCount columns and arranges the PurchaseOrderDetailCount in ascending order.

In [0]:
%sql
SELECT u.ProductID, 
  os.StandardCost AS Cost, 
  u.PurchaseOrderDetailCount
FROM adventureworks_dlh.fact_orders_silver AS os
INNER JOIN (
  SELECT ProductID
  , COUNT(PurchaseOrderDetailID) AS PurchaseOrderDetailCount
  FROM adventureworks_dlh.fact_orders_silver
  GROUP BY ProductID
) AS u
ON u.ProductID = os.ProductID
ORDER BY u.PurchaseOrderDetailCount ASC



ProductID,Cost,PurchaseOrderDetailCount
