## Final Project: Capstone
Building on my midterm project, I continued using the MySQL Sample Database (Link: https://www.mysqltutorial.org/getting-started-with-mysql/mysql-sample-database/) in which I renamed it to company. Using the company data warehouse, I was able to create date dimension and date customer in MySQL. I downloaded and used products as my JSON file and pushed it to MongoDB to create the product dimension. Then, I downloaded a csv file of employee and created the employee dimension in databrirks using the DBFS file system (Both the JSON and CSV file are found in my batch file). Three JSON files of the fact orders table  were then downloaded that were evenly split among the three JSON files (JSON files found in my stream file) to show the streaming data. Lastly, I created bronze, silver, and gold tables, to prove that it works.

#### Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "yda2zm-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "company_dw2"

connection_properties = {
  "user" : "tsoto",
  "password" : "Wat3r17g00d",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Lab04.iopdr"
atlas_database_name = "final_project"
atlas_user_name = "tatianaasoto17"
atlas_password = "Aut4mn31"

mongo_uri = f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net/{atlas_database_name}"
print(mongo_uri)

# Data Files (JSON) Information ###############################
dst_database = "company_dlh"
base_dir = "dbfs:/FileStore/ds2002-finalproject"
database_dir = f"{base_dir}/{dst_database}"

batch_dir = f"{base_dir}/batch"
stream_dir = f"{base_dir}/stream"

orders_stream_dir = f"{stream_dir}"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

mongodb+srv://tatianaasoto17:Aut4mn31@Lab04.iopdr.mongodb.net/final_project


True

#### Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### Fetch Reference Data From an Azure MySQL Database
##### Create a New Databricks Metadata Database named comapny_dlh.

In [0]:
%sql
DROP DATABASE IF EXISTS company_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS company_dlh
COMMENT "DS-2002 Final Project"
LOCATION "dbfs:/FileStore/ds2002-finalproject/company_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

##### Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://yda2zm-mysql.mysql.database.azure.com:3306/company_dw2",
  dbtable "dim_date",
  user "tsoto",
  password "Wat3r17g00d"
)

In [0]:
%sql
USE DATABASE company_dlh;

CREATE OR REPLACE TABLE company_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-finalproject/company_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED company_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM company_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### Create a New Table that Sources Customer Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_customer" that extracts data from your MySQL company database.
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://yda2zm-mysql.mysql.database.azure.com:3306/company_dw2",
  dbtable "dim_customer",
  user "tsoto",
  password "Wat3r17g00d"
)

In [0]:
%sql
USE DATABASE company_dlh;

-- Create a new table named "company_dlh.dim_product" using data from the view named "view_customer"

CREATE OR REPLACE TABLE company_dlh.dim_customer
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-finalproject/company_dlh/dim_customer"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED company_dlh.dim_customer;

col_name,data_type,comment
customer_key,bigint,
customerName,varchar(65535),
phone,varchar(65535),
addressLine1,varchar(65535),
addressLine2,varchar(65535),
city,varchar(65535),
postalCode,varchar(65535),
country,varchar(65535),
creditLimit,double,
,,


In [0]:
%sql
SELECT * FROM company_dlh.dim_customer LIMIT 5

customer_key,customerName,phone,addressLine1,addressLine2,city,postalCode,country,creditLimit
103,Atelier graphique,40.32.2555,"54, rue Royale",,Nantes,44000,France,21000.0
112,Signal Gift Stores,7025551838,8489 Strong St.,,Las Vegas,83030,USA,71800.0
114,"Australian Collectors, Co.",03 9520 4555,636 St Kilda Road,Level 3,Melbourne,3004,Australia,117300.0
119,La Rochelle Gifts,40.67.8555,"67, rue des Cinquante Otages",,Nantes,44000,France,118200.0
121,Baane Mini Imports,07-98 9555,Erling Skakkes gate 78,,Stavern,4110,Norway,81700.0


#### Fetch Reference Data from a MongoDB Atlas Database
##### View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-finalproject/batch/company_employees.csv,company_employees.csv,1823,1733528139000
dbfs:/FileStore/ds2002-finalproject/batch/product.json,product.json,44812,1733528139000
dbfs:/FileStore/ds2002-finalproject/batch/products_company.json,products_company.json,113588,1733528139000


##### Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds2002-finalproject/batch'
json_files = {"product" : 'product.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f2945f7cd40>

In [0]:
%scala
import com.mongodb.spark._

val userName = "tatianaasoto17"
val pwd = "Aut4mn31"
val clusterName = "Lab04.iopdr"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

##### Fetch Product Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_product = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "final_project")
.option("collection", "product").load()
.select("product_key", "productLine", "productCode", "productName", "productDescription", "quantityInStock", "buyPrice", "MSRP")

display(df_product)

product_key,productLine,productCode,productName,productDescription,quantityInStock,buyPrice,MSRP
1,Classic Cars,S10_1949,1952 Alpine Renault 1300,Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,7305,98.58,214.3
2,Classic Cars,S10_4757,1972 Alfa Romeo GTA,Features include: Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,3252,85.68,136.0
3,Classic Cars,S10_4962,1962 LanciaA Delta 16V,Features include: Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,6791,103.42,147.74
4,Classic Cars,S12_1099,1968 Ford Mustang,"Hood, doors and trunk all open to reveal highly detailed interior features. Steering wheel actually turns the front wheels. Color dark green.",68,95.34,194.57
5,Classic Cars,S12_1108,2001 Ferrari Enzo,Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,3619,95.59,207.8
6,Classic Cars,S12_3148,1969 Corvair Monza,"1:18 scale die-cast about 10"" long doors open, hood opens, trunk opens and wheels roll",6906,89.14,151.08
7,Classic Cars,S12_3380,1968 Dodge Charger,"1:12 scale model of a 1968 Dodge Charger. Hood, doors and trunk all open to reveal highly detailed interior features. Steering wheel actually turns the front wheels. Color black",9123,75.16,117.44
8,Classic Cars,S12_3891,1969 Ford Falcon,Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,1049,83.05,173.02
9,Classic Cars,S12_3990,1970 Plymouth Hemi Cuda,Very detailed 1970 Plymouth Cuda model in 1:12 scale. The Cuda is generally accepted as one of the fastest original muscle cars from the 1970s. This model is a reproduction of one of the orginal 652 cars built in 1970. Red color.,5663,31.92,79.8
10,Classic Cars,S12_4675,1969 Dodge Charger,Detailed model of the 1969 Dodge Charger. This model includes finely detailed interior and exterior features. Painted in red and white.,7323,58.73,115.16


In [0]:
%scala
df_product.printSchema()

##### Use the Spark DataFrame to Create a New Product Dimension Table in the Databricks Metadata Database (company_dlh)

In [0]:
%scala
df_product.write.format("delta").mode("overwrite").saveAsTable("company_dlh.dim_product")

In [0]:
%sql
DESCRIBE EXTENDED company_dlh.dim_product

col_name,data_type,comment
product_key,int,
productLine,string,
productCode,string,
productName,string,
productDescription,string,
quantityInStock,int,
buyPrice,double,
MSRP,double,
,,
# Delta Statistics Columns,,


In [0]:
%sql
SELECT * FROM company_dlh.dim_product LIMIT 5

product_key,productLine,productCode,productName,productDescription,quantityInStock,buyPrice,MSRP
1,Classic Cars,S10_1949,1952 Alpine Renault 1300,Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,7305,98.58,214.3
2,Classic Cars,S10_4757,1972 Alfa Romeo GTA,Features include: Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,3252,85.68,136.0
3,Classic Cars,S10_4962,1962 LanciaA Delta 16V,Features include: Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,6791,103.42,147.74
4,Classic Cars,S12_1099,1968 Ford Mustang,"Hood, doors and trunk all open to reveal highly detailed interior features. Steering wheel actually turns the front wheels. Color dark green.",68,95.34,194.57
5,Classic Cars,S12_1108,2001 Ferrari Enzo,Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,3619,95.59,207.8


#### Fetch Data from a File System
##### Use PySpark to Read From an Employee CSV File

In [0]:
employee_csv = f"{batch_dir}/company_employees.csv"

df_employee = spark.read.format('csv').options(header='true', inferSchema='true').load(employee_csv)
display(df_employee)

employeeNumber,lastName,firstName,email,jobTitle,officeCity,officeCountry
1002,Murphy,Diane,dmurphy@classicmodelcars.com,President,San Francisco,USA
1056,Patterson,Mary,mpatterso@classicmodelcars.com,VP Sales,San Francisco,USA
1076,Firrelli,Jeff,jfirrelli@classicmodelcars.com,VP Marketing,San Francisco,USA
1088,Patterson,William,wpatterson@classicmodelcars.com,Sales Manager (APAC),Sydney,Australia
1102,Bondur,Gerard,gbondur@classicmodelcars.com,Sale Manager (EMEA),Paris,France
1143,Bow,Anthony,abow@classicmodelcars.com,Sales Manager (NA),San Francisco,USA
1165,Jennings,Leslie,ljennings@classicmodelcars.com,Sales Rep,San Francisco,USA
1166,Thompson,Leslie,lthompson@classicmodelcars.com,Sales Rep,San Francisco,USA
1188,Firrelli,Julie,jfirrelli@classicmodelcars.com,Sales Rep,Boston,USA
1216,Patterson,Steve,spatterson@classicmodelcars.com,Sales Rep,Boston,USA


In [0]:
df_employee.printSchema()

root
 |-- employeeNumber: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- email: string (nullable = true)
 |-- jobTitle: string (nullable = true)
 |-- officeCity: string (nullable = true)
 |-- officeCountry: string (nullable = true)



In [0]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("company_dlh.dim_employee")

In [0]:
%sql
DESCRIBE EXTENDED company_dlh.dim_employee;

col_name,data_type,comment
employeeNumber,int,
lastName,string,
firstName,string,
email,string,
jobTitle,string,
officeCity,string,
officeCountry,string,
,,
# Delta Statistics Columns,,
Column Names,"email, officeCountry, lastName, firstName, employeeNumber, jobTitle, officeCity",


In [0]:
%sql
SELECT * FROM company_dlh.dim_employee LIMIT 5;

employeeNumber,lastName,firstName,email,jobTitle,officeCity,officeCountry
1002,Murphy,Diane,dmurphy@classicmodelcars.com,President,San Francisco,USA
1056,Patterson,Mary,mpatterso@classicmodelcars.com,VP Sales,San Francisco,USA
1076,Firrelli,Jeff,jfirrelli@classicmodelcars.com,VP Marketing,San Francisco,USA
1088,Patterson,William,wpatterson@classicmodelcars.com,Sales Manager (APAC),Sydney,Australia
1102,Bondur,Gerard,gbondur@classicmodelcars.com,Sale Manager (EMEA),Paris,France


In [0]:
%sql
USE company_dlh;
SHOW TABLES

database,tableName,isTemporary
company_dlh,dim_customer,False
company_dlh,dim_date,False
company_dlh,dim_employee,False
company_dlh,dim_product,False
,_sqldf,True
,display_query_1,True
,display_query_2,True
,display_query_3,True
,fact_orders_silver_tempview,True
,orders_bronze_tempview,True


### Integrate Reference Data with Real-Time Data
#### Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 .option("cloudFiles.schemaHints", "order_key BIGINT")
 .option("cloudFiles.schemaHints", "customer_key BIGINT") 
 .option("cloudFiles.schemaHints", "productCode STRING")
 .option("cloudFiles.schemaHints", "quantityOrdered BIGINT")
 .option("cloudFiles.schemaHints", "priceEach BIGINT")
 .option("cloudFiles.schemaHints", "orderLineNumber BIGINT")
 .option("cloudFiles.schemaHints", "order_date_key BIGINT")
 .option("cloudFiles.schemaHints", "required_date_key BIGINT")
 .option("cloudFiles.schemaHints", "shipped_date_key BIGINT") 
 .option("cloudFiles.schemaHints", "product_key BIGINT")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

customer_key,fact_order_key,orderLineNumber,order_date_key,order_key,priceEach,productCode,product_key,quantityOrdered,required_date_key,shipped_date_key,_rescued_data,receipt_time,source_file
141,35,5,20030131,10104,106.45,S24_4048,35,26,20030209,20030201,,2024-12-08T23:50:13.252Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders3.json
141,36,11,20030131,10104,51.95,S32_2509,36,35,20030209,20030201,,2024-12-08T23:50:13.252Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders3.json
141,37,4,20030131,10104,56.55,S32_3207,37,49,20030209,20030201,,2024-12-08T23:50:13.252Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders3.json
141,38,7,20030131,10104,114.59,S50_1392,38,33,20030209,20030201,,2024-12-08T23:50:13.252Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders3.json
141,39,2,20030131,10104,53.31,S50_1514,39,32,20030209,20030201,,2024-12-08T23:50:13.252Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders3.json
145,40,2,20030211,10105,127.84,S10_4757,40,50,20030221,20030212,,2024-12-08T23:50:13.252Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders3.json
145,41,15,20030211,10105,205.72,S12_1108,41,41,20030221,20030212,,2024-12-08T23:50:13.252Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders3.json
145,42,14,20030211,10105,141.88,S12_3891,42,29,20030221,20030212,,2024-12-08T23:50:13.252Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders3.json
145,43,11,20030211,10105,136.59,S18_3140,43,22,20030221,20030212,,2024-12-08T23:50:13.252Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders3.json
145,44,13,20030211,10105,87.73,S18_3259,44,38,20030221,20030212,,2024-12-08T23:50:13.252Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders3.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f29447012d0>

##### Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

customer_key,fact_order_key,orderLineNumber,order_date_key,order_key,priceEach,productCode,product_key,quantityOrdered,required_date_key,shipped_date_key,_rescued_data,receipt_time,source_file
363,1,3,20030106,10100,136.0,S18_1749,1,30,20030113,20030110,,2024-12-08T23:50:27.879Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders1.json
363,2,2,20030106,10100,55.09,S18_2248,2,50,20030113,20030110,,2024-12-08T23:50:27.879Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders1.json
363,3,4,20030106,10100,75.46,S18_4409,3,22,20030113,20030110,,2024-12-08T23:50:27.879Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders1.json
363,4,1,20030106,10100,35.29,S24_3969,4,49,20030113,20030110,,2024-12-08T23:50:27.879Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders1.json
128,5,4,20030109,10101,108.06,S18_2325,5,25,20030118,20030111,,2024-12-08T23:50:27.879Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders1.json
128,6,1,20030109,10101,167.06,S18_2795,6,26,20030118,20030111,,2024-12-08T23:50:27.879Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders1.json
128,7,3,20030109,10101,32.53,S24_1937,7,45,20030118,20030111,,2024-12-08T23:50:27.879Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders1.json
128,8,2,20030109,10101,44.35,S24_2022,8,46,20030118,20030111,,2024-12-08T23:50:27.879Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders1.json
181,9,2,20030110,10102,95.55,S18_1342,9,39,20030118,20030114,,2024-12-08T23:50:27.879Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders1.json
181,10,1,20030110,10102,43.13,S18_1367,10,41,20030118,20030114,,2024-12-08T23:50:27.879Z,dbfs:/FileStore/ds2002-finalproject/stream/fact_orders1.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
customer_key,bigint,
fact_order_key,bigint,
orderLineNumber,bigint,
order_date_key,bigint,
order_key,bigint,
priceEach,double,
productCode,string,
product_key,bigint,
quantityOrdered,bigint,
required_date_key,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
SELECT
    os.customer_key,
    dc.customerName AS customer_name,
    dc.phone AS customer_phone,
    dc.addressLine1 AS customer_address,
    dc.city AS customer_city,
    dc.country AS customer_country,
    dp.productName AS product_name,
    dp.productLine AS product_line,
    dp.MSRP AS product_price,
    dp.quantityInStock AS product_stock_quantity,
    de.employeeNumber AS employee_id,
    de.firstName AS employee_first_name,
    de.lastName AS employee_last_name,
    de.jobTitle AS employee_job_title,
    dd_order.full_date AS order_date,
    dd_required.full_date AS required_date,
    dd_shipped.full_date AS shipped_date,
    os.quantityOrdered AS quantity_ordered,
    os.priceEach AS price_each,
    os.quantityOrdered * os.priceEach AS total_price
  FROM orders_silver_tempview AS os
  INNER JOIN company_dlh.dim_customer AS dc
  ON dc.customer_key = os.customer_key
  INNER JOIN company_dlh.dim_employee AS de
  ON de.employeeNumber = de.employeeNumber
  INNER JOIN company_dlh.dim_product AS dp
  ON dp.product_key = os.product_key

  LEFT OUTER JOIN company_dlh.dim_date AS dd_order
  ON os.order_date_key = dd_order.date_key
  LEFT OUTER JOIN company_dlh.dim_date AS dd_required
  ON os.required_date_key = dd_required.date_key
  LEFT OUTER JOIN company_dlh.dim_date AS dd_shipped
  ON os.shipped_date_key = dd_shipped.date_key
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f290899ecd0>

In [0]:
%sql
SELECT * FROM fact_orders_silver
LIMIT 5

customer_key,customer_name,customer_phone,customer_address,customer_city,customer_country,product_name,product_line,product_price,product_stock_quantity,employee_id,employee_first_name,employee_last_name,employee_job_title,order_date,required_date,shipped_date,quantity_ordered,price_each,total_price
121,Baane Mini Imports,07-98 9555,Erling Skakkes gate 78,Stavern,Norway,1970 Dodge Coronet,Classic Cars,57.8,4074,1102,Gerard,Bondur,Sale Manager (EMEA),2003-01-29,2003-02-07,2003-02-02,42,94.07,3950.94
128,"Blauer See Auto, Co.",+49 69 66 90 2555,Lyonerstr. 34,Frankfurt,Germany,1969 Ford Falcon,Classic Cars,173.02,1049,1166,Leslie,Thompson,Sales Rep,2003-01-09,2003-01-18,2003-01-11,46,44.35,2040.1
141,Euro+ Shopping Channel,(91) 555 94 44,"C/ Moralzarzal, 86",Madrid,Spain,2002 Chevy Corvette,Classic Cars,107.08,9446,1286,Foon Yue,Tseng,Sales Rep,2003-01-31,2003-02-09,2003-02-01,44,30.41,1338.04
145,Danish Wholesale Imports,31 12 3555,Vinbæltet 34,Kobenhavn,Denmark,2002 Yamaha YZR M1,Motorcycles,81.36,600,1337,Loui,Bondur,Sales Rep,2003-02-11,2003-02-21,2003-02-12,31,60.72,1882.32
181,Vitachrome Inc.,2125551500,2678 Kingston Rd.,NYC,USA,1969 Dodge Charger,Classic Cars,115.16,7323,1102,Gerard,Bondur,Sale Manager (EMEA),2003-01-10,2003-01-18,2003-01-14,41,43.13,1768.3300000000002


In [0]:
%sql
DESCRIBE EXTENDED company_dlh.fact_orders_silver

col_name,data_type,comment
customer_key,bigint,
customer_name,varchar(65535),
customer_phone,varchar(65535),
customer_address,varchar(65535),
customer_city,varchar(65535),
customer_country,varchar(65535),
product_name,string,
product_line,string,
product_price,double,
product_stock_quantity,int,


##### Gold Table: Perform Aggregations
Executed two queries to prove it works:
First query displays aggregated sales data (total quantity ordered, total sales, average price, minimum price, and maximum price) for each product line, ordered by total sales in descending order. Second query displays the total number of orders and the total sales for each customer, ordered by the total sales in descending order.

In [0]:
%sql
SELECT 
    dp.productLine AS product_line,
    SUM(fo.quantity_ordered) AS total_quantity_ordered,
    SUM(fo.quantity_ordered * fo.price_each) AS total_sales,
    AVG(fo.price_each) AS average_price,
    MIN(fo.price_each) AS minimum_price,
    MAX(fo.price_each) AS maximum_price
FROM 
    company_dlh.fact_orders_silver fo
LEFT JOIN 
    dim_product dp ON fo.product_line = dp.productLine
GROUP BY 
    dp.productLine
ORDER BY 
    total_sales DESC



product_line,total_quantity_ordered,total_sales,average_price,minimum_price,maximum_price
Classic Cars,1158050,100498506.8999992,92.40710526315335,30.41,214.3
Motorcycles,146809,14581279.180000065,99.75230769230572,53.31,205.72


In [0]:
%sql
SELECT 
    c.customerName,
    COUNT(fo.order_date) AS total_orders,
    SUM(fo.quantity_ordered * fo.price_each) AS total_sales
FROM 
    company_dlh.fact_orders_silver AS fo
JOIN 
    dim_customer AS c ON fo.customer_key = c.customer_key
GROUP BY 
    c.customerName
ORDER BY 
    total_sales DESC

customerName,total_orders,total_sales
Baane Mini Imports,368,1155035.8500000006
Danish Wholesale Imports,276,1082400.6999999995
Euro+ Shopping Channel,299,924742.5999999992
"Blauer See Auto, Co.",92,242627.2300000002
Online Diecast Creations Co.,92,235148.0899999997
Vitachrome Inc.,46,126379.94


#### Clean up the File System

In [0]:
%fs rm -r /FileStore/ds2002-finalproject/