## DS 2002: Final Project 

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "rvt9bx-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "adventureworks_dw"

connection_properties = {
  "user" : "rvt9bx",
  "password" : "Ronddejambe2003",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.zrzzkeo"
atlas_database_name = "adventureworks_dw"
atlas_user_name = "rvt9bx"
atlas_password = "Ronddejambe2003"

# Data Files (JSON) Information ###############################
dst_database = "adventureworks_dlh"

base_dir = "dbfs:/FileStore/final-data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

purchase_orders_stream_dir = f"{stream_dir}"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"


# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

False

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS adventureworks_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS adventureworks_dlh
COMMENT "DS-2002 Final Database"
LOCATION "dbfs:/FileStore/final-data/adventureworks_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://rvt9bx-mysql.mysql.database.azure.com:3306/adventureworks_dw", --Replace with your Server Name
  dbtable "dim_date",
  user "rvt9bx",    --Replace with your User Name
  password "Ronddejambe2003"  --Replace with your password
)

In [0]:
%sql
USE DATABASE adventureworks_dlh;

CREATE OR REPLACE TABLE adventureworks_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final-data/adventureworks_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED adventureworks_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM adventureworks_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
19950101,1995-01-01,1995/01/01,01/01/1995,01/01/1995,1,Sunday,1,1,Weekend,52,January,1,N,1,1995,1995-01,1995Q1,7,3,1995,1995-07,1995Q3
19950102,1995-01-02,1995/01/02,01/02/1995,02/01/1995,2,Monday,2,2,Weekday,1,January,1,N,1,1995,1995-01,1995Q1,7,3,1995,1995-07,1995Q3
19950103,1995-01-03,1995/01/03,01/03/1995,03/01/1995,3,Tuesday,3,3,Weekday,1,January,1,N,1,1995,1995-01,1995Q1,7,3,1995,1995-07,1995Q3
19950104,1995-01-04,1995/01/04,01/04/1995,04/01/1995,4,Wednesday,4,4,Weekday,1,January,1,N,1,1995,1995-01,1995Q1,7,3,1995,1995-07,1995Q3
19950105,1995-01-05,1995/01/05,01/05/1995,05/01/1995,5,Thursday,5,5,Weekday,1,January,1,N,1,1995,1995-01,1995Q1,7,3,1995,1995-07,1995Q3


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_products" that extracts data from MySQL Adventureworks database.

CREATE OR REPLACE TEMPORARY VIEW view_products
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://rvt9bx-mysql.mysql.database.azure.com:3306/adventureworks_dw", --Replace with your Server Name
  dbtable "dim_products",
  user "rvt9bx",    --Replace with your User Name
  password "Ronddejambe2003"  --Replace with you password
)

In [0]:
%sql
USE DATABASE adventureworks_dlh;

-- Create a new table named "adventureworks_dlh.dim_products" using data from the view named "view_product"

CREATE OR REPLACE TABLE adventureworks_dlh.dim_products
COMMENT "Product Dimension Table"
LOCATION "dbfs:/FileStore/final-data/adventureworks_dlh/dim_products"
AS SELECT * FROM view_products

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED adventureworks_dlh.dim_products;

col_name,data_type,comment
product_key,bigint,
ProductID,bigint,
Name,varchar(65535),
ProductNumber,varchar(65535),
MakeFlag,bigint,
FinishedGoodsFlag,bigint,
SafetyStockLevel,bigint,
ReorderPoint,bigint,
StandardCost,double,
ListPrice,double,


In [0]:
%sql
SELECT * FROM adventureworks_dlh.dim_products LIMIT 5

product_key,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,DaysToManufacture,modified_date_key
1,1,Adjustable Race,AR-5381,0,0,1000,750,0.0,0.0,0,20040311
2,2,Bearing Ball,BA-8327,0,0,1000,750,0.0,0.0,0,20040311
3,3,BB Ball Bearing,BE-2349,1,0,800,600,0.0,0.0,1,20040311
4,4,Headset Ball Bearings,BE-2908,0,0,800,600,0.0,0.0,0,20040311
5,316,Blade,BL-2036,1,0,800,600,0.0,0.0,1,20040311


##### 1.4. Create a New Table that Sources Employee Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_employees" that extracts data from MySQL Adventureworks database.

CREATE OR REPLACE TEMPORARY VIEW view_employees
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://rvt9bx-mysql.mysql.database.azure.com:3306/adventureworks_dw", --Replace with your Server Name
  dbtable "dim_employees",
  user "rvt9bx",    --Replace with your User Name
  password "Ronddejambe2003"  --Replace with you password
)

In [0]:
%sql
USE DATABASE adventureworks_dlh;

-- Create a new table named "adventureworks_dlh.dim_employees" using data from the view named "view_employees"

CREATE OR REPLACE TABLE adventureworks_dlh.dim_employees
COMMENT "Employees Dimension Table"
LOCATION "dbfs:/FileStore/final-data/adventureworks_dlh/dim_employees"
AS SELECT * FROM view_employees

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED adventureworks_dlh.dim_employees;

col_name,data_type,comment
employee_key,bigint,
EmployeeID,bigint,
FirstName,varchar(65535),
LastName,varchar(65535),
Title,varchar(65535),
EmailAddress,varchar(65535),
Phone,varchar(65535),
MaritalStatus,varchar(65535),
Gender,varchar(65535),
hire_date_key,bigint,


In [0]:
%sql
SELECT * FROM adventureworks_dlh.dim_employees LIMIT 5

employee_key,EmployeeID,FirstName,LastName,Title,EmailAddress,Phone,MaritalStatus,Gender,hire_date_key,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag,modified_date_key
1,1,Guy,Gilbert,Production Technician - WC60,guy1@adventure-works.com,320-555-0195,M,M,19960731,0,21,30,1,20040731
2,2,Kevin,Brown,Marketing Assistant,kevin0@adventure-works.com,150-555-0189,S,M,19970226,0,42,41,1,20040731
3,3,Roberto,Tamburello,Engineering Manager,roberto0@adventure-works.com,212-555-0187,M,M,19971212,1,2,21,1,20040731
4,4,Rob,Walters,Senior Tool Designer,rob0@adventure-works.com,612-555-0100,S,M,19980105,0,48,80,1,20040731
5,5,Thierry,D'Hers,Tool Designer,thierry0@adventure-works.com,168-555-0183,M,M,19980111,0,9,24,1,20040731


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/final-data/batch'

path,name,size,modificationTime
dbfs:/FileStore/final-data/batch/adventureworks_dim_shipmethod.json,adventureworks_dim_shipmethod.json,800,1715016214000
dbfs:/FileStore/final-data/batch/adventureworks_dim_vendors.csv,adventureworks_dim_vendors.csv,5773,1715016214000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/final-data/batch'
json_files = {"shipmethod" : 'adventureworks_dim_shipmethod.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f3a5bedb700>

##### 2.3.1 Fetch Ship Method Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "rvt9bx"
val pwd = "Ronddejambe2003"
val clusterName = "cluster0.zrzzkeo"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_shipmethod = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "adventureworks_dw")
.option("collection", "shipmethod").load()
.select("shipmethod_key", "ShipMethodID", "Name", "ShipBase", "ShipRate", "modified_date_key")

display(df_shipmethod)

shipmethod_key,ShipMethodID,Name,ShipBase,ShipRate,modified_date_key
1,1,XRQ - TRUCK GROUND,3.95,0.99,19980601
2,2,ZY - EXPRESS,9.95,1.99,19980601
3,3,OVERSEAS - DELUXE,29.95,2.99,19980601
4,4,OVERNIGHT J-FAST,21.95,1.29,19980601
5,5,CARGO TRANSPORT 5,8.99,1.49,19980601


In [0]:
%scala
df_shipmethod.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Ship Method Dimension Table in the Databricks Metadata Database (adventureworks_dlh)

In [0]:
%scala
df_shipmethod.write.format("delta").mode("overwrite").saveAsTable("adventureworks_dlh.dim_shipmethod")

In [0]:
%sql
DESCRIBE EXTENDED adventureworks_dlh.dim_shipmethod

col_name,data_type,comment
shipmethod_key,int,
ShipMethodID,int,
Name,string,
ShipBase,double,
ShipRate,double,
modified_date_key,int,
,,
# Delta Statistics Columns,,
Column Names,"ShipRate, modified_date_key, Name, ShipBase, shipmethod_key, ShipMethodID",
Column Selection Method,first-32,


In [0]:
%sql
SELECT * FROM adventureworks_dlh.dim_shipmethod LIMIT 5

shipmethod_key,ShipMethodID,Name,ShipBase,ShipRate,modified_date_key
1,1,XRQ - TRUCK GROUND,3.95,0.99,19980601
2,2,ZY - EXPRESS,9.95,1.99,19980601
3,3,OVERSEAS - DELUXE,29.95,2.99,19980601
4,4,OVERNIGHT J-FAST,21.95,1.29,19980601
5,5,CARGO TRANSPORT 5,8.99,1.49,19980601


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
vendors_csv = f"{batch_dir}/adventureworks_dim_vendors.csv"

df_vendors = spark.read.format('csv').options(header='true', inferSchema='true').load(vendors_csv)
display(df_vendors)

vendor_key,VendorID,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,modified_date_key
1,1,INTERNAT0001,International,1,1,1,20020225
2,2,ELECTRON0002,Electronic Bike Repair & Supplies,1,1,1,20020217
3,3,PREMIER0001,"Premier Sport, Inc.",1,1,1,20020305
4,4,COMFORT0001,Comfort Road Bicycles,1,1,1,20020124
5,5,METROSP0001,Metro Sport Equipment,1,1,1,20020301
6,6,GREENLA0001,Green Lake Bike Company,1,1,1,20020301
7,7,MOUNTAIN0001,Mountain Works,1,0,1,20020305
8,8,CONTINEN0001,Continental Pro Cycles,3,1,1,20020124
9,9,ADATUM0001,A. Datum Corporation,1,1,1,20020124
10,10,TREYRE0001,Trey Research,3,1,1,20020225


In [0]:
df_vendors.printSchema()

root
 |-- vendor_key: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- AccountNumber: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- CreditRating: integer (nullable = true)
 |-- PreferredVendorStatus: integer (nullable = true)
 |-- ActiveFlag: integer (nullable = true)
 |-- modified_date_key: integer (nullable = true)



In [0]:
df_vendors.write.format("delta").mode("overwrite").saveAsTable("adventureworks_dlh.dim_vendors")

In [0]:
%sql
DESCRIBE EXTENDED adventureworks_dlh.dim_vendors;

col_name,data_type,comment
vendor_key,int,
VendorID,int,
AccountNumber,string,
Name,string,
CreditRating,int,
PreferredVendorStatus,int,
ActiveFlag,int,
modified_date_key,int,
,,
# Delta Statistics Columns,,


In [0]:
%sql
SELECT * FROM adventureworks_dlh.dim_vendors LIMIT 5;

vendor_key,VendorID,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,modified_date_key
1,1,INTERNAT0001,International,1,1,1,20020225
2,2,ELECTRON0002,Electronic Bike Repair & Supplies,1,1,1,20020217
3,3,PREMIER0001,"Premier Sport, Inc.",1,1,1,20020305
4,4,COMFORT0001,Comfort Road Bicycles,1,1,1,20020124
5,5,METROSP0001,Metro Sport Equipment,1,1,1,20020301


##### Verify Dimension Tables

In [0]:
%sql
USE adventureworks_dlh;
SHOW TABLES

database,tableName,isTemporary
adventureworks_dlh,dim_date,False
adventureworks_dlh,dim_employees,False
adventureworks_dlh,dim_products,False
adventureworks_dlh,dim_shipmethod,False
adventureworks_dlh,dim_vendors,False
,view_date,True
,view_employees,True
,view_products,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Purchase Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
# Use spark.readStream and the AutoLoader to read in the JSON files in the "purchase_orders_stream_dir"
# directory and then create a TempView named "purchase_orders_raw_tempview".
# Be sure to set the "cloudFiles.schemaLocation" Option using the "purchase_orders_output_bronze" directory

(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 #.option("cloudFiles.schemaHints", "order_key BIGINT")
 #.option("cloudFiles.schemaHints", "employee_key BIGINT")
 #.option("cloudFiles.schemaHints", "customer_key BIGINT") 
 #.option("cloudFiles.schemaHints", "product_key BIGINT")
 #.option("cloudFiles.schemaHints", "shipper_key DECIMAL")
 #.option("cloudFiles.schemaHints", "order_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "paid_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 #.option("cloudFiles.schemaHints", "quantity DECIMAL")
 #.option("cloudFiles.schemaHints", "unit_price DECIMAL")
 #.option("cloudFiles.schemaHints", "discount DECIMAL")
 #.option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 #.option("cloudFiles.schemaHints", "taxes DECIMAL")
 #.option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 #.option("cloudFiles.schemaHints", "payment_type STRING")
 #.option("cloudFiles.schemaHints", "order_status STRING")
 #.option("cloudFiles.schemaHints", "order_details_status STRING")
 .option("cloudFiles.schemaLocation", purchase_orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(purchase_orders_stream_dir)
 .createOrReplaceTempView("purchase_orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW purchase_orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM purchase_orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM purchase_orders_bronze_tempview

Freight,PurchaseOrderID,RevisionNumber,Status,SubTotal,TaxAmt,TotalDue,employee_key,modified_date_key,order_date_key,purchase_order_key,ship_date_key,shipmethod_key,vendor_key,_rescued_data,receipt_time,source_file
3.7698,2667,0,4,150.7905,12.0632,166.6235,164,20040528,20040519,2667,20040528,5,83,,2024-05-06T17:31:10.02Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_3.json
17.4132,2668,0,4,696.528,55.7222,769.6634,223,20040528,20040519,2668,20040528,5,32,,2024-05-06T17:31:10.02Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_3.json
221.1825,2669,0,4,8847.3,707.784,9776.2665,233,20040531,20040522,2669,20040531,2,38,,2024-05-06T17:31:10.02Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_3.json
4.2769,2670,0,4,171.0765,13.6861,189.0395,238,20040531,20040522,2670,20040531,5,85,,2024-05-06T17:31:10.02Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_3.json
509.9325,2671,0,4,20397.3,1631.784,22539.0165,261,20040531,20040522,2671,20040531,4,92,,2024-05-06T17:31:10.02Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_3.json
365.7019,2672,0,4,14628.075,1170.246,16164.0229,264,20040531,20040522,2672,20040531,3,11,,2024-05-06T17:31:10.02Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_3.json
633.3731,2673,0,4,25334.925,2026.794,27995.0921,244,20040531,20040522,2673,20040531,4,84,,2024-05-06T17:31:10.02Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_3.json
6.9631,2674,0,4,278.523,22.2818,307.7679,231,20040531,20040522,2674,20040531,5,78,,2024-05-06T17:31:10.02Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_3.json
2.7767,2675,0,4,111.069,8.8855,122.7312,241,20040531,20040522,2675,20040531,4,74,,2024-05-06T17:31:10.02Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_3.json
44.9009,2676,0,4,1796.0355,143.6828,1984.6192,266,20040531,20040522,2676,20040531,5,13,,2024-05-06T17:31:10.02Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_3.json


In [0]:
(spark.table("purchase_orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{purchase_orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_purchase_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f3a59db1990>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_purchase_orders_bronze")
  .createOrReplaceTempView("purchase_orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM purchase_orders_silver_tempview

Freight,PurchaseOrderID,RevisionNumber,Status,SubTotal,TaxAmt,TotalDue,employee_key,modified_date_key,order_date_key,purchase_order_key,ship_date_key,shipmethod_key,vendor_key,_rescued_data,receipt_time,source_file
5.026,1,0,4,201.04,16.0832,222.1492,244,20010526,20010517,1,20010526,3,83,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
6.8025,2,0,1,272.1015,21.7681,300.6721,231,20010526,20010517,2,20010526,5,32,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
221.1825,3,0,4,8847.3,707.784,9776.2665,241,20010526,20010517,3,20010526,2,38,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
4.2769,4,0,3,171.0765,13.6861,189.0395,266,20010526,20010517,4,20010526,5,85,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
509.9325,5,0,4,20397.3,1631.784,22539.0165,164,20010609,20010531,5,20010609,4,92,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
365.7019,6,0,4,14628.075,1170.246,16164.0229,223,20010609,20010531,6,20010609,3,11,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
1467.1388,7,0,4,58685.55,4694.844,64847.5328,233,20010609,20010531,7,20010609,3,84,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
17.3345,8,0,4,693.378,55.4702,766.1827,238,20010609,20010531,8,20010609,5,78,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
17.3541,9,1,4,694.1655,55.5332,767.0528,261,20020123,20020114,9,20020123,5,74,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
44.9009,10,0,4,1796.0355,143.6828,1984.6192,274,20020123,20020114,10,20020123,5,13,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json


In [0]:
%sql
DESCRIBE EXTENDED purchase_orders_silver_tempview

col_name,data_type,comment
Freight,double,
PurchaseOrderID,bigint,
RevisionNumber,bigint,
Status,bigint,
SubTotal,double,
TaxAmt,double,
TotalDue,double,
employee_key,bigint,
modified_date_key,bigint,
order_date_key,bigint,


In [0]:
%sql
-- Create a new Temporary View named "fact_purchase_orders_silver_tempview" by selecting data from
-- "purchase_orders_silver_tempview" and joining it to the Vendor, Ship Method, Product, Employee and Date dimension tables.
-- Remember that the Date dimension can serve as a "Role Playing" dimension by being Joined upon multiple times.

CREATE OR REPLACE TEMPORARY VIEW fact_purchase_orders_silver_tempview AS (
  SELECT o.purchase_order_key,
  o.PurchaseOrderID,
  o.RevisionNumber,
  o.Status,
  o.SubTotal,
  o.TaxAmt,
  o.Freight,
  o.TotalDue,
  o.employee_key,
  concat(e.FirstName, ' ', e.LastName) as employee_name,
  o.modified_date_key,
  md.day_name_of_week AS modified_day_name_of_week,
  md.day_of_month AS modified_day_of_month,
  md.weekday_weekend AS modified_weekday_weekend,
  md.month_name AS modified_month_name,
  md.calendar_quarter AS modified_quarter,
  md.calendar_year AS modified_year,
  o.order_date_key,
  od.day_name_of_week AS order_day_name_of_week,
  od.day_of_month AS order_day_of_month,
  od.weekday_weekend AS order_weekday_weekend,
  od.month_name AS order_month_name,
  od.calendar_quarter AS order_quarter,
  od.calendar_year AS order_year,
  o.ship_date_key,
  sd.day_name_of_week AS ship_day_name_of_week,
  sd.day_of_month AS ship_day_of_month,
  sd.weekday_weekend AS ship_weekday_weekend,
  sd.month_name AS ship_month_name,
  sd.calendar_quarter AS ship_quarter,
  sd.calendar_year AS ship_year,
  o.shipmethod_key,
  sm.Name AS shipmethod_name,
  o.vendor_key,
  v.Name AS vendor_name,
  o._rescued_data,
  o.receipt_time,
  o.source_file
  FROM purchase_orders_silver_tempview AS o
  INNER JOIN adventureworks_dlh.dim_employees AS e
  ON e.employee_key = o.employee_key
  INNER JOIN adventureworks_dlh.dim_shipmethod AS sm
  ON sm.shipmethod_key = o.shipmethod_key
  INNER JOIN adventureworks_dlh.dim_vendors AS v
  ON v.vendor_key = o.vendor_key
  LEFT OUTER JOIN adventureworks_dlh.dim_date AS md
  ON md.date_key = o.modified_date_key
  LEFT OUTER JOIN adventureworks_dlh.dim_date AS od
  ON od.date_key = o.order_date_key
  LEFT OUTER JOIN adventureworks_dlh.dim_date AS sd
  ON sd.date_key = o.ship_date_key

)






In [0]:
(spark.table("fact_purchase_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{purchase_orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_purchase_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f3a59db1a80>

In [0]:
%sql
SELECT * FROM fact_purchase_orders_silver

purchase_order_key,PurchaseOrderID,RevisionNumber,Status,SubTotal,TaxAmt,Freight,TotalDue,employee_key,employee_name,modified_date_key,modified_day_name_of_week,modified_day_of_month,modified_weekday_weekend,modified_month_name,modified_quarter,modified_year,order_date_key,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year,ship_date_key,ship_day_name_of_week,ship_day_of_month,ship_weekday_weekend,ship_month_name,ship_quarter,ship_year,shipmethod_key,shipmethod_name,vendor_key,vendor_name,_rescued_data,receipt_time,source_file
1,1,0,4,201.04,16.0832,5.026,222.1492,244,Erin Hagens,20010526,Saturday,26,Weekend,May,2,2001,20010517,Thursday,17,Weekday,May,2,2001,20010526,Saturday,26,Weekend,May,2,2001,3,OVERSEAS - DELUXE,83,"Litware, Inc.",,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
2,2,0,1,272.1015,21.7681,6.8025,300.6721,231,Fukiko Ogisu,20010526,Saturday,26,Weekend,May,2,2001,20010517,Thursday,17,Weekday,May,2,2001,20010526,Saturday,26,Weekend,May,2,2001,5,CARGO TRANSPORT 5,32,Advanced Bicycles,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
3,3,0,4,8847.3,707.784,221.1825,9776.2665,241,Eric Kurjan,20010526,Saturday,26,Weekend,May,2,2001,20010517,Thursday,17,Weekday,May,2,2001,20010526,Saturday,26,Weekend,May,2,2001,2,ZY - EXPRESS,38,Allenson Cycles,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
4,4,0,3,171.0765,13.6861,4.2769,189.0395,266,Reinout Hillmann,20010526,Saturday,26,Weekend,May,2,2001,20010517,Thursday,17,Weekday,May,2,2001,20010526,Saturday,26,Weekend,May,2,2001,5,CARGO TRANSPORT 5,85,American Bicycles and Wheels,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
5,5,0,4,20397.3,1631.784,509.9325,22539.0165,164,Mikael Sandberg,20010609,Saturday,9,Weekend,June,2,2001,20010531,Thursday,31,Weekday,May,2,2001,20010609,Saturday,9,Weekend,June,2,2001,4,OVERNIGHT J-FAST,92,American Bikes,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
6,6,0,4,14628.075,1170.246,365.7019,16164.0229,223,Linda Meisner,20010609,Saturday,9,Weekend,June,2,2001,20010531,Thursday,31,Weekday,May,2,2001,20010609,Saturday,9,Weekend,June,2,2001,3,OVERSEAS - DELUXE,11,Anderson's Custom Bikes,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
7,7,0,4,58685.55,4694.844,1467.1388,64847.5328,233,Gordon Hee,20010609,Saturday,9,Weekend,June,2,2001,20010531,Thursday,31,Weekday,May,2,2001,20010609,Saturday,9,Weekend,June,2,2001,3,OVERSEAS - DELUXE,84,"Proseware, Inc.",,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
8,8,0,4,693.378,55.4702,17.3345,766.1827,238,Frank Pellow,20010609,Saturday,9,Weekend,June,2,2001,20010531,Thursday,31,Weekday,May,2,2001,20010609,Saturday,9,Weekend,June,2,2001,5,CARGO TRANSPORT 5,78,Aurora Bike Center,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
9,9,1,4,694.1655,55.5332,17.3541,767.0528,261,Ben Miller,20020123,Wednesday,23,Weekday,January,1,2002,20020114,Monday,14,Weekday,January,1,2002,20020123,Wednesday,23,Weekday,January,1,2002,5,CARGO TRANSPORT 5,74,Australia Bike Retailer,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json
10,10,0,4,1796.0355,143.6828,44.9009,1984.6192,274,Sheela Word,20020123,Wednesday,23,Weekday,January,1,2002,20020114,Monday,14,Weekday,January,1,2002,20020123,Wednesday,23,Weekday,January,1,2002,5,CARGO TRANSPORT 5,13,Beaumont Bikes,,2024-05-06T17:31:48.26Z,dbfs:/FileStore/final-data/stream/adventureworks_fact_purchase_orders_1.json


In [0]:
%sql
DESCRIBE EXTENDED fact_purchase_orders_silver

col_name,data_type,comment
purchase_order_key,bigint,
PurchaseOrderID,bigint,
RevisionNumber,bigint,
Status,bigint,
SubTotal,double,
TaxAmt,double,
Freight,double,
TotalDue,double,
employee_key,bigint,
employee_name,string,


##### 6.3. Gold Table: Perform Aggregations
Creating a new Gold table using the CTAS approach. The table should include the total amount due of the purchase orders placed per vendor.

In [0]:
%sql
-- Author a query that returns the Total Due grouped by Vendor and sorted by Total Due descending.

CREATE OR REPLACE TABLE adventureworks_dlh.total_due_per_vendor_gold AS (
  SELECT vendor_name AS VendorCompanyName,
  ROUND(SUM(TotalDue),2) AS TotalDue
  FROM adventureworks_dlh.fact_purchase_orders_silver
  GROUP BY VendorCompanyName
  ORDER BY TotalDue DESC);

SELECT * FROM adventureworks_dlh.total_due_per_vendor_gold;


VendorCompanyName,TotalDue
Superior Bicycles,3624672.05
Chicago City Saddles,2616555.19
Professional Athletic Consultants,2477122.15
Jackson Authority,2067514.45
"Vision Cycles, Inc.",1973906.06
Greenwood Athletic Company,1890941.8
Crowley Sport,1890941.8
Sport Fan Co.,1852538.69
"Proseware, Inc.",1843726.01
Mitchell Sports,1793970.43


#### 7.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/final-data/