## Final Project: Data Lakehouse with Structured Streaming
The goal of the second data project, building upon the first project, is to further demonstrate (1) an understanding of and (2) competence creating and implementing basic data science systems such as pipelines, scripts, data transformations, APIs, databases and cloud services. This project has been submitted in my GitHub Repo (https://github.com/racheljmuppidi/Final-Project) and file drop on Canvas.   

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
%pip install pymongo

import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "Azure Connection.mysql.database.azure.com"
jdbc_port = 3306
src_database = "classicmodels_dw2"

connection_properties = {
  "user" : "rjm5yf",
  "password" : "Rach!uva2023",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.p7ukejh"
atlas_database_name = "classicmodels_dw2"
atlas_user_name = "rjm5yf"
atlas_password = "ds2002"

# Data Files (JSON) Information ###############################
dst_database = "northwind_dlh"

base_dir = "dbfs:/FileStore/final-project"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/models"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Date Dimension and Products Reference Data from an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS classicmodels_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS classicmodels_dlh
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/final-project/classicmodels_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://rjm5yf-mysql.mysql.database.azure.com:3306/classicmodels_dw2", --Replace with your Server Name
  dbtable "dim_date",
  user "rjm5yf",    --Replace with your User Name
  password "Rach!uva2023"  --Replace with your password
)

In [0]:
%sql
USE DATABASE classicmodels_dlh;

CREATE OR REPLACE TABLE classicmodels_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final-project/classicmodels_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL Northwind database.

CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://rjm5yf-mysql.mysql.database.azure.com:3306/classicmodels_dw2", --Replace with your Server Name
  dbtable "dim_products",
  user "rjm5yf",    --Replace with your User Name
  password "Rach!uva2023"  --Replace with you password
)

In [0]:
%sql
USE DATABASE classicmodels_dlh;

-- Create a new table named "classicmodels_dlh.dim_product" using data from the view named "view_product"

CREATE OR REPLACE TABLE classicmodels_dlh.dim_product
COMMENT "Products Dimension Table"
LOCATION "dbfs:/FileStore/final-project/classicmodels_dlh/dim_product"
AS SELECT * FROM view_product

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_product;

col_name,data_type,comment
product_key,bigint,
productCode,varchar(65535),
productName,varchar(65535),
productLine,varchar(65535),
productScale,varchar(65535),
productVendor,varchar(65535),
quantityInStock,bigint,
buyPrice,double,
MSRP,double,
,,


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_product LIMIT 5

product_key,productCode,productName,productLine,productScale,productVendor,quantityInStock,buyPrice,MSRP
1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,7933,48.81,95.7
2,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,7305,98.58,214.3
3,S10_2016,1996 Moto Guzzi 1100i,Motorcycles,1:10,Highway 66 Mini Classics,6625,68.99,118.94
4,S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,1:10,Red Start Diecast,5582,91.02,193.66
5,S10_4757,1972 Alfa Romeo GTA,Classic Cars,1:10,Motor City Art Classics,3252,85.68,136.0


#### 2.0. Fetch Customer Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/final-project/models/batch'

path,name,size,modificationTime
dbfs:/FileStore/final-project/models/batch/classicmodels_dim_customers.csv,classicmodels_dim_customers.csv,12348,1715128117000
dbfs:/FileStore/final-project/models/batch/classicmodels_dim_customers.json,classicmodels_dim_customers.json,41177,1715128876000
dbfs:/FileStore/final-project/models/batch/classicmodels_dim_productLines.csv,classicmodels_dim_productLines.csv,3446,1715129599000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/final-project/models/batch'
json_files = {"customers" : 'classicmodels_dim_customers.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 



<pymongo.results.InsertManyResult at 0x7f9734eb78c0>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "rjm5yf"
val pwd = "ds2002"
val clusterName = "cluster0.p7ukejh"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "classicmodels_dw2")
.option("collection", "customers")
.option("uri", atlas_uri).load()
.select("customer_key","customerNumber","customerName","contactLastName","contactFirstName","addressLine1","city","postalCode","country","salesRepEmployeeNumber","creditLimit")

display(df_customer)

customer_key,customerNumber,customerName,contactLastName,contactFirstName,addressLine1,city,postalCode,country,salesRepEmployeeNumber,creditLimit
1,103,Atelier graphique,Schmitt,Carine,"54, rue Royale",Nantes,44000,France,1370.0,21000
2,112,Signal Gift Stores,King,Jean,8489 Strong St.,Las Vegas,83030,USA,1166.0,71800
3,114,"Australian Collectors, Co.",Ferguson,Peter,636 St Kilda Road,Melbourne,3004,Australia,1611.0,117300
4,119,La Rochelle Gifts,Labrune,Janine,"67, rue des Cinquante Otages",Nantes,44000,France,1370.0,118200
5,121,Baane Mini Imports,Bergulfsen,Jonas,Erling Skakkes gate 78,Stavern,4110,Norway,1504.0,81700
6,124,Mini Gifts Distributors Ltd.,Nelson,Susan,5677 Strong St.,San Rafael,97562,USA,1165.0,210500
7,125,Havel & Zbyszek Co,Piestrzeniewicz,Zbyszek,ul. Filtrowa 68,Warszawa,01-012,Poland,,0
8,128,"Blauer See Auto, Co.",Keitel,Roland,Lyonerstr. 34,Frankfurt,60528,Germany,1504.0,59700
9,129,Mini Wheels Co.,Murphy,Julie,5557 North Pendale Street,San Francisco,94217,USA,1165.0,64600
10,131,Land of Toys Inc.,Lee,Kwai,897 Long Airport Avenue,NYC,10022,USA,1323.0,114900


In [0]:
%scala
df_customer.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
customerNumber,int,
customerName,string,
contactLastName,string,
contactFirstName,string,
addressLine1,string,
city,string,
postalCode,string,
country,string,
salesRepEmployeeNumber,int,


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_customer LIMIT 5

customer_key,customerNumber,customerName,contactLastName,contactFirstName,addressLine1,city,postalCode,country,salesRepEmployeeNumber,creditLimit
1,103,Atelier graphique,Schmitt,Carine,"54, rue Royale",Nantes,44000,France,1370,21000
2,112,Signal Gift Stores,King,Jean,8489 Strong St.,Las Vegas,83030,USA,1166,71800
3,114,"Australian Collectors, Co.",Ferguson,Peter,636 St Kilda Road,Melbourne,3004,Australia,1611,117300
4,119,La Rochelle Gifts,Labrune,Janine,"67, rue des Cinquante Otages",Nantes,44000,France,1370,118200
5,121,Baane Mini Imports,Bergulfsen,Jonas,Erling Skakkes gate 78,Stavern,4110,Norway,1504,81700


#### 3.0. Fetch ProductLines Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
productLine_csv = f"{batch_dir}/classicmodels_dim_productLines.csv"

df_productLine = spark.read.format('csv').options(header='true', inferSchema='true').load(productLine_csv)
display(df_productLine)

productline_key,productLine,textDescription
1,Classic Cars,"Attention car enthusiasts: Make your wildest car ownership dreams come true. Whether you are looking for classic muscle cars, dream sports cars or movie-inspired miniatures, you will find great choices in this category. These replicas feature superb attention to detail and craftsmanship and offer features such as working steering system, opening forward compartment, opening rear trunk with removable spare wheel, 4-wheel independent spring suspension, and so on. The models range in size from 1:10 to 1:24 scale and include numerous limited edition and several out-of-production vehicles. All models include a certificate of authenticity from their manufacturers and come fully assembled and ready for display in the home or office."
2,Motorcycles,"Our motorcycles are state of the art replicas of classic as well as contemporary motorcycle legends such as Harley Davidson, Ducati and Vespa. Models contain stunning details such as official logos, rotating wheels, working kickstand, front suspension, gear-shift lever, footbrake lever, and drive chain. Materials used include diecast and plastic. The models range in size from 1:10 to 1:50 scale and include numerous limited edition and several out-of-production vehicles. All models come fully assembled and ready for display in the home or office. Most include a certificate of authenticity."
3,Planes,"Unique, diecast airplane and helicopter replicas suitable for collections, as well as home, office or classroom decorations. Models contain stunning details such as official logos and insignias, rotating jet engines and propellers, retractable wheels, and so on. Most come fully assembled and with a certificate of authenticity from their manufacturers."
4,Ships,"The perfect holiday or anniversary gift for executives, clients, friends, and family. These handcrafted model ships are unique, stunning works of art that will be treasured for generations! They come fully assembled and ready for display in the home or office. We guarantee the highest quality, and best value."
5,Trains,"Model trains are a rewarding hobby for enthusiasts of all ages. Whether you're looking for collectible wooden trains, electric streetcars or locomotives, you'll find a number of great choices for any budget within this category. The interactive aspect of trains makes toy trains perfect for young children. The wooden train sets are ideal for children under the age of 5."
6,Trucks and Buses,"The Truck and Bus models are realistic replicas of buses and specialized trucks produced from the early 1920s to present. The models range in size from 1:12 to 1:50 scale and include numerous limited edition and several out-of-production vehicles. Materials used include tin, diecast and plastic. All models include a certificate of authenticity from their manufacturers and are a perfect ornament for the home and office."
7,Vintage Cars,"Our Vintage Car models realistically portray automobiles produced from the early 1900s through the 1940s. Materials used include Bakelite, diecast, plastic and wood. Most of the replicas are in the 1:18 and 1:24 scale sizes, which provide the optimum in detail and accuracy. Prices range from $30.00 up to $180.00 for some special limited edition replicas. All models include a certificate of authenticity from their manufacturers and come fully assembled and ready for display in the home or office."


In [0]:
df_productLine.printSchema()

root
 |-- productline_key: integer (nullable = true)
 |-- productLine: string (nullable = true)
 |-- textDescription: string (nullable = true)



In [0]:
df_productLine.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_productLine")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_productLine;

col_name,data_type,comment
productline_key,int,
productLine,string,
textDescription,string,
,,
# Delta Statistics Columns,,
Column Names,"productline_key, productLine, textDescription",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_productLine LIMIT 5;

productline_key,productLine,textDescription
1,Classic Cars,"Attention car enthusiasts: Make your wildest car ownership dreams come true. Whether you are looking for classic muscle cars, dream sports cars or movie-inspired miniatures, you will find great choices in this category. These replicas feature superb attention to detail and craftsmanship and offer features such as working steering system, opening forward compartment, opening rear trunk with removable spare wheel, 4-wheel independent spring suspension, and so on. The models range in size from 1:10 to 1:24 scale and include numerous limited edition and several out-of-production vehicles. All models include a certificate of authenticity from their manufacturers and come fully assembled and ready for display in the home or office."
2,Motorcycles,"Our motorcycles are state of the art replicas of classic as well as contemporary motorcycle legends such as Harley Davidson, Ducati and Vespa. Models contain stunning details such as official logos, rotating wheels, working kickstand, front suspension, gear-shift lever, footbrake lever, and drive chain. Materials used include diecast and plastic. The models range in size from 1:10 to 1:50 scale and include numerous limited edition and several out-of-production vehicles. All models come fully assembled and ready for display in the home or office. Most include a certificate of authenticity."
3,Planes,"Unique, diecast airplane and helicopter replicas suitable for collections, as well as home, office or classroom decorations. Models contain stunning details such as official logos and insignias, rotating jet engines and propellers, retractable wheels, and so on. Most come fully assembled and with a certificate of authenticity from their manufacturers."
4,Ships,"The perfect holiday or anniversary gift for executives, clients, friends, and family. These handcrafted model ships are unique, stunning works of art that will be treasured for generations! They come fully assembled and ready for display in the home or office. We guarantee the highest quality, and best value."
5,Trains,"Model trains are a rewarding hobby for enthusiasts of all ages. Whether you're looking for collectible wooden trains, electric streetcars or locomotives, you'll find a number of great choices for any budget within this category. The interactive aspect of trains makes toy trains perfect for young children. The wooden train sets are ideal for children under the age of 5."


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")

 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

comments,customerNumber,fact_order_key,orderDate_key,orderLineNumber,orderNumber,priceEach,product_key,quantityOrdered,requiredDate_key,shippedDate,status,_rescued_data,receipt_time,source_file
,319,999,20031125,8,10195,51.95,88,32,20031201,2003-11-28,Shipped,,2024-05-11T03:08:12.387Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
Check on availability.,495,1000,20031209,9,10207,51.95,88,27,20031217,2003-12-11,Shipped,,2024-05-11T03:08:12.387Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
,487,1001,20040210,4,10219,47.62,88,35,20040217,2004-02-12,Shipped,,2024-05-11T03:08:12.387Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
,124,1002,20040311,3,10229,49.78,88,23,20040320,2004-03-12,Shipped,,2024-05-11T03:08:12.387Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
,141,1003,20040505,7,10246,45.45,88,35,20040513,2004-05-06,Shipped,,2024-05-11T03:08:12.387Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
,166,1004,20040615,6,10259,45.99,88,40,20040622,2004-06-17,Shipped,,2024-05-11T03:08:12.387Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
,124,1005,20040720,7,10271,51.95,88,35,20040729,2004-07-23,Shipped,,2024-05-11T03:08:12.387Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
,157,1006,20040819,3,10281,44.91,88,31,20040828,2004-08-23,Shipped,,2024-05-11T03:08:12.387Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
They want to reevaluate their terms agreement with Finance.,131,1007,20040908,10,10292,54.11,88,50,20040918,2004-09-11,Shipped,,2024-05-11T03:08:12.387Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
Check on availability.,286,1008,20041013,7,10305,48.7,88,40,20041022,2004-10-15,Shipped,,2024-05-11T03:08:12.387Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f970cef2a10>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

comments,customerNumber,fact_order_key,orderDate_key,orderLineNumber,orderNumber,priceEach,product_key,quantityOrdered,requiredDate_key,shippedDate,status,_rescued_data,receipt_time,source_file
,319,999,20031125,8,10195,51.95,88,32,20031201,2003-11-28,Shipped,,2024-05-11T03:08:17.14Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
Check on availability.,495,1000,20031209,9,10207,51.95,88,27,20031217,2003-12-11,Shipped,,2024-05-11T03:08:17.14Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
,487,1001,20040210,4,10219,47.62,88,35,20040217,2004-02-12,Shipped,,2024-05-11T03:08:17.14Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
,124,1002,20040311,3,10229,49.78,88,23,20040320,2004-03-12,Shipped,,2024-05-11T03:08:17.14Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
,141,1003,20040505,7,10246,45.45,88,35,20040513,2004-05-06,Shipped,,2024-05-11T03:08:17.14Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
,166,1004,20040615,6,10259,45.99,88,40,20040622,2004-06-17,Shipped,,2024-05-11T03:08:17.14Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
,124,1005,20040720,7,10271,51.95,88,35,20040729,2004-07-23,Shipped,,2024-05-11T03:08:17.14Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
,157,1006,20040819,3,10281,44.91,88,31,20040828,2004-08-23,Shipped,,2024-05-11T03:08:17.14Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
They want to reevaluate their terms agreement with Finance.,131,1007,20040908,10,10292,54.11,88,50,20040918,2004-09-11,Shipped,,2024-05-11T03:08:17.14Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json
Check on availability.,286,1008,20041013,7,10305,48.7,88,40,20041022,2004-10-15,Shipped,,2024-05-11T03:08:17.14Z,dbfs:/FileStore/final-project/models/stream/orders/classicmodels_fact_orders_2.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
comments,string,
customerNumber,bigint,
fact_order_key,bigint,
orderDate_key,bigint,
orderLineNumber,bigint,
orderNumber,bigint,
priceEach,double,
product_key,bigint,
quantityOrdered,bigint,
requiredDate_key,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.fact_order_key, 
	  o.product_key,
    p.productCode AS product_code,
    p.productName AS product_name,
    p.productLine AS product_line,
    p.productScale AS product_scale,
    p.productVendor AS product_vendor,
    p.quantityInStock AS product_quantity,
    p.buyPrice AS product_price,
    p.MSRP AS product_msrp,

    o.customerNumber AS customer_key,
    c.customerName AS customer_name,
    c.contactLastName AS last_name,
    c.contactFirstName AS first_name,
    c.addressLine1 AS address_line,
    c.city AS city,
    c.postalCode AS postal_code,
    c.country AS country,
    c.creditLimit AS credit_limit,
    
    od.day_name_of_week AS order_day_name_of_week,
    od.day_of_month AS order_day_of_month,
    od.weekday_weekend AS order_weekday_weekend,
    od.month_name AS order_month_name,
    od.calendar_quarter AS order_quarter,
    od.calendar_year AS order_year,

    o.orderNumber,
    o.status,
    o.comments,
    o.quantityOrdered,
    o.priceEach,
    o.orderLineNumber

  FROM orders_silver_tempview AS o
  INNER JOIN classicmodels_dlh.dim_product as p
  ON p.product_key = o.product_key
  INNER JOIN classicmodels_dlh.dim_customer as c
  ON c.customerNumber = o.customerNumber
  LEFT OUTER JOIN classicmodels_dlh.dim_date AS od
  ON od.date_key = o.orderDate_key
)


In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f970cd00850>

In [0]:
%sql
SELECT * FROM fact_orders_silver

fact_order_key,product_key,product_code,product_name,product_line,product_scale,product_vendor,product_quantity,product_price,product_msrp,customer_key,customer_name,last_name,first_name,address_line,city,postal_code,country,credit_limit,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year,orderNumber,status,comments,quantityOrdered,priceEach,orderLineNumber
999,88,S32_2509,1954 Greyhound Scenicruiser,Trucks and Buses,1:32,Classic Metal Creations,2874,25.98,54.11,319,Mini Classics,Frick,Steve,3758 North Pendale Street,White Plains,24067,USA,102700,Tuesday,25,Weekday,November,4,2003,10195,Shipped,,32,51.95,8
1000,88,S32_2509,1954 Greyhound Scenicruiser,Trucks and Buses,1:32,Classic Metal Creations,2874,25.98,54.11,495,Diecast Collectables,Franco,Valarie,6251 Ingle Ln.,Boston,51003,USA,85100,Tuesday,9,Weekday,December,4,2003,10207,Shipped,Check on availability.,27,51.95,9
1001,88,S32_2509,1954 Greyhound Scenicruiser,Trucks and Buses,1:32,Classic Metal Creations,2874,25.98,54.11,487,Signal Collectibles Ltd.,Taylor,Sue,2793 Furth Circle,Brisbane,94217,USA,60300,Tuesday,10,Weekday,February,1,2004,10219,Shipped,,35,47.62,4
1002,88,S32_2509,1954 Greyhound Scenicruiser,Trucks and Buses,1:32,Classic Metal Creations,2874,25.98,54.11,124,Mini Gifts Distributors Ltd.,Nelson,Susan,5677 Strong St.,San Rafael,97562,USA,210500,Thursday,11,Weekday,March,1,2004,10229,Shipped,,23,49.78,3
1003,88,S32_2509,1954 Greyhound Scenicruiser,Trucks and Buses,1:32,Classic Metal Creations,2874,25.98,54.11,141,Euro+ Shopping Channel,Freyre,Diego,"C/ Moralzarzal, 86",Madrid,28034,Spain,227600,Wednesday,5,Weekday,May,2,2004,10246,Shipped,,35,45.45,7
1004,88,S32_2509,1954 Greyhound Scenicruiser,Trucks and Buses,1:32,Classic Metal Creations,2874,25.98,54.11,166,Handji Gifts& Co,Victorino,Wendy,106 Linden Road Sandown,Singapore,069045,Singapore,97900,Tuesday,15,Weekday,June,2,2004,10259,Shipped,,40,45.99,6
1005,88,S32_2509,1954 Greyhound Scenicruiser,Trucks and Buses,1:32,Classic Metal Creations,2874,25.98,54.11,124,Mini Gifts Distributors Ltd.,Nelson,Susan,5677 Strong St.,San Rafael,97562,USA,210500,Tuesday,20,Weekday,July,3,2004,10271,Shipped,,35,51.95,7
1006,88,S32_2509,1954 Greyhound Scenicruiser,Trucks and Buses,1:32,Classic Metal Creations,2874,25.98,54.11,157,Diecast Classics Inc.,Leong,Kelvin,7586 Pompton St.,Allentown,70267,USA,100600,Thursday,19,Weekday,August,3,2004,10281,Shipped,,31,44.91,3
1007,88,S32_2509,1954 Greyhound Scenicruiser,Trucks and Buses,1:32,Classic Metal Creations,2874,25.98,54.11,131,Land of Toys Inc.,Lee,Kwai,897 Long Airport Avenue,NYC,10022,USA,114900,Wednesday,8,Weekday,September,3,2004,10292,Shipped,They want to reevaluate their terms agreement with Finance.,50,54.11,10
1008,88,S32_2509,1954 Greyhound Scenicruiser,Trucks and Buses,1:32,Classic Metal Creations,2874,25.98,54.11,286,Marta's Replicas Co.,Hernandez,Marta,39323 Spinnaker Dr.,Cambridge,51247,USA,123700,Wednesday,13,Weekday,October,4,2004,10305,Shipped,Check on availability.,40,48.7,7


Databricks data profile. Run in Databricks to view.

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.fact_orders_silver

col_name,data_type,comment
fact_order_key,bigint,
product_key,bigint,
product_code,varchar(65535),
product_name,varchar(65535),
product_line,varchar(65535),
product_scale,varchar(65535),
product_vendor,varchar(65535),
product_quantity,bigint,
product_price,double,
product_msrp,double,


##### 6.3. Gold Table: Perform Aggregations
I performed two aggregations below:

In [0]:
%sql
CREATE OR REPLACE TABLE classicmodels_dlh.fact_orders_gold_1 AS (
    SELECT p.productName AS product_name
        , SUM(fo.quantityOrdered) AS total_quantity
    FROM classicmodels_dlh.fact_orders_silver AS fo
    INNER JOIN classicmodels_dlh.dim_product AS p
    ON fo.product_key = p.product_key
    GROUP BY p.productName);


SELECT * FROM classicmodels_dlh.fact_orders_gold_1;




product_name,total_quantity
1936 Chrysler Airflow,983
The Titanic,952
18th Century Vintage Horse Carriage,907
1958 Setra Bus,972
Diamond T620 Semi-Skirted Tanker,979
2001 Ferrari Enzo,1019
The Queen Mary,896
1930 Buick Marquette Phaeton,1074
The Mayflower,898
1958 Chevy Corvette Limited Edition,983


In [0]:
%sql
CREATE OR REPLACE TABLE classicmodels_dlh.fact_orders_gold_2 AS (
    SELECT pl.productLine AS product_line
        , SUM(p.quantityInStock) AS quantity_in_stock
    FROM classicmodels_dlh.dim_product AS p
    INNER JOIN classicmodels_dlh.dim_productline AS pl
    ON pl.productline_key = p.product_key
    GROUP BY pl.productLine);


SELECT * FROM classicmodels_dlh.fact_orders_gold_2;


product_line,quantity_in_stock
Motorcycles,7305
Vintage Cars,68
Ships,5582
Trucks and Buses,6791
Classic Cars,7933
Trains,3252
Planes,6625


#### 9.0. Clean up the File System

In [0]:
#%fs rm -r /FileStore/lab_data/