## Lab 06: Data Lakehouse with Structured Streaming
This lab will help you learn to use many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-session capstone project for course **DS-2002: Data Systems**. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "twq8db-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "chinook_dw2"

connection_properties = {
  "user" : "twq8db",
  "password" : "password123#",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "ds2002.piqwm4s"
atlas_database_name = "chinook_dw2"
atlas_user_name = "twq8db"
atlas_password = "password123#"

# Data Files (JSON) Information ###############################
dst_database = "chinook_dlh"

base_dir = "dbfs:/FileStore/ds2002-final"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

invoice_output_bronze = f"{database_dir}/fact_invoice/bronze"
invoice_output_silver = f"{database_dir}/fact_invoice/silver"
invoice_output_gold   = f"{database_dir}/fact_invoice/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_invoice", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[2]: True

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS chinook_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS chinook_dlh
COMMENT "DS-2002 Final Database"
LOCATION "dbfs:/FileStore/ds2002-final/chinook_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wna8fw-mysql.mysql.database.azure.com:3306/northwind_dw2",
  dbtable "dim_date",
  user "jtupitza",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE chinook_dlh;

CREATE OR REPLACE TABLE chinook_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/chinook_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Customer Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://twq8db-mysql.mysql.database.azure.com:3306/chinook_dw2?useSSL=true&requireSSL=false&verifyServerCertificate=true&sslMode=VERIFY_CA&user=twq8db&password=password123#",
  dbtable "dim_customer",
  user "twq8db",
  password "password123#"
)

In [0]:
%sql

-- Create a new table named "northwind_dlh.dim_product" using data from the view named "view_product"
USE DATABASE chinook_dlh;

CREATE OR REPLACE TABLE chinook_dlh.dim_customer
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-final/northwind_dlh/dim_customer"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_customer;

col_name,data_type,comment
customer_key,int,
FirstName,string,
LastName,string,
Company,string,
Address,string,
City,string,
State,string,
Country,string,
PostalCode,string,
Phone,string,


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_customer LIMIT 5

customer_key,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Fax,Email
1,Luís,Gonçalves,Embraer - Empresa Brasileira de Aeronáutica S.A.,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,+55 (12) 3923-5566,luisg@embraer.com.br
2,Leonie,Köhler,,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,+49 0711 2842222,,leonekohler@surfeu.de
3,François,Tremblay,,1498 rue Bélanger,Montréal,QC,Canada,H2G 1A7,+1 (514) 721-4711,,ftremblay@gmail.com
4,Bjørn,Hansen,,Ullevålsveien 14,Oslo,,Norway,0171,+47 22 44 22 22,,bjorn.hansen@yahoo.no
5,František,Wichterlová,JetBrains s.r.o.,Klanova 9/506,Prague,,Czech Republic,14700,+420 2 4172 5555,+420 2 4172 5555,frantisekw@jetbrains.com


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-final/source_data/batch/dim_artist.json,dim_artist.json,13704,1683766014000
dbfs:/FileStore/ds2002-final/source_data/batch/dim_customers.json,dim_customers.json,15302,1683766014000
dbfs:/FileStore/ds2002-final/source_data/batch/dim_invoice_line.csv,dim_invoice_line.csv,14363,1683766014000
dbfs:/FileStore/ds2002-final/source_data/batch/dim_invoice_line.json,dim_invoice_line.json,72316,1683766014000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds2002-final/source_data/batch'
json_files = {"artists" : 'dim_artist.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[15]: <pymongo.results.InsertManyResult at 0x7ff880d3a490>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val uri = "mongodb+srv://twq8db:password123#@ds2002.piqwm4s.mongodb.net/chinook_dw2"

val df_artist = spark.read.format("com.mongodb.spark.sql.DefaultSource") .option("uri", uri)
.option("database", "chinook_dw2").option("collection", "artists").load()
.select("artist_key","Name")

display(df_artist)

artist_key,Name
1,AC/DC
2,Accept
3,Aerosmith
4,Alanis Morissette
5,Alice In Chains
6,Antônio Carlos Jobim
7,Apocalyptica
8,Audioslave
9,BackBeat
10,Billy Cobham


In [0]:
%scala
df_artist.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_artist.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_artist")

In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_artist

col_name,data_type,comment
artist_key,int,
Name,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,chinook_dlh,
Table,dim_artist,
Type,MANAGED,
Location,dbfs:/FileStore/ds2002-final/chinook_dlh/dim_artist,
Provider,delta,


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_artist LIMIT 5

artist_key,Name
1,AC/DC
2,Accept
3,Aerosmith
4,Alanis Morissette
5,Alice In Chains


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
invoice_line_csv = f"{batch_dir}/dim_invoice_line.csv"

df_invoice_line = spark.read.format('csv').options(header='true', inferSchema='true').load(invoice_line_csv)
display(df_invoice_line)

invoice_line_key,invoice_key,UnitPrice,Quantity
1,1,0.99,1
2,1,0.99,1
3,2,0.99,1
4,2,0.99,1
5,2,0.99,1
6,2,0.99,1
7,3,0.99,1
8,3,0.99,1
9,3,0.99,1
10,3,0.99,1


In [0]:
df_invoice_line.printSchema()

root
 |-- invoice_line_key: integer (nullable = true)
 |-- invoice_key: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- Quantity: integer (nullable = true)



In [0]:
df_invoice_line.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_invoice_line")

In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_invoice_line;

col_name,data_type,comment
invoice_line_key,int,
invoice_key,int,
UnitPrice,double,
Quantity,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,chinook_dlh,
Table,dim_invoice_line,
Type,MANAGED,


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_invoice_line LIMIT 5;

invoice_line_key,invoice_key,UnitPrice,Quantity
1,1,0.99,1
2,1,0.99,1
3,2,0.99,1
4,2,0.99,1
5,2,0.99,1


##### Verify Dimension Tables

In [0]:
%sql
USE chinook_dlh;
SHOW TABLES

database,tableName,isTemporary
chinook_dlh,dim_artist,False
chinook_dlh,dim_customer,False
chinook_dlh,dim_date,False
chinook_dlh,dim_invoice_line,False
,view_customer,True
,view_date,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_invoice_key BIGINT")
 .option("cloudFiles.schemaHints", "customer_key BIGINT")
 .option("cloudFiles.schemaHints", "invoice_key BIGINT")
 .option("cloudFiles.schemaHints", "invoice_line_key BIGINT")
 .option("cloudFiles.schemaHints", "invoice_date_key DECIMAL") 
 .option("cloudFiles.schemaHints", "BillingAddress STRING")
 .option("cloudFiles.schemaHints", "BillingCity STRING")
 .option("cloudFiles.schemaHints", "BillingState STRING")
 .option("cloudFiles.schemaHints", "BillingCountry STRING")
 .option("cloudFiles.schemaHints", "BillingPostalCode STRING") 
 .option("cloudFiles.schemaHints", "total_price DECIMAL")
 .option("cloudFiles.schemaHints", "customer_first_name STRING")
 .option("cloudFiles.schemaHints", "customer_last_name STRING")
 .option("cloudFiles.schemaHints", "Address STRING")
 .option("cloudFiles.schemaHints", "City STRING")
 .option("cloudFiles.schemaHints", "State STRING")
 .option("cloudFiles.schemaHints", "Country STRING")
 .option("cloudFiles.schemaHints", "PostalCode STRING")
 .option("cloudFiles.schemaHints", "customer_phone STRING")
 .option("cloudFiles.schemaHints", "customer_fax STRING")
 .option("cloudFiles.schemaHints", "customer_email STRING")
 .option("cloudFiles.schemaHints", "unit_price DOUBLE")
 .option("cloudFiles.schemaHints", "Quantity BIGINT")
 .option("cloudFiles.schemaLocation", invoice_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("invoice_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW invoice_output_bronze AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM invoice_raw_tempview
)

In [0]:
%sql
SELECT * FROM invoice_raw_tempview

Address,BillingAddress,BillingCity,BillingCountry,BillingPostalCode,BillingState,City,Country,PostalCode,Quantity,State,customer_email,customer_fax,customer_first_name,customer_key,customer_last_name,customer_phone,fact_invoice_key,invoice_date_key,invoice_key,invoice_line_key,total_price,unit_price,_rescued_data
"Via Degli Scipioni, 43","Via Degli Scipioni, 43",Rome,Italy,00192,RM,Rome,Italy,00192,1,RM,lucas.mancini@yahoo.it,,Lucas,47,Mancini,+39 06 39733434,667,1271116800000,108,577,5.94,0.99,
"Via Degli Scipioni, 43","Via Degli Scipioni, 43",Rome,Italy,00192,RM,Rome,Italy,00192,1,RM,lucas.mancini@yahoo.it,,Lucas,47,Mancini,+39 06 39733434,668,1271116800000,108,578,5.94,0.99,
"Via Degli Scipioni, 43","Via Degli Scipioni, 43",Rome,Italy,00192,RM,Rome,Italy,00192,1,RM,lucas.mancini@yahoo.it,,Lucas,47,Mancini,+39 06 39733434,669,1271116800000,108,579,5.94,0.99,
"Via Degli Scipioni, 43","Via Degli Scipioni, 43",Rome,Italy,00192,RM,Rome,Italy,00192,1,RM,lucas.mancini@yahoo.it,,Lucas,47,Mancini,+39 06 39733434,670,1271116800000,108,580,5.94,0.99,
"Via Degli Scipioni, 43","Via Degli Scipioni, 43",Rome,Italy,00192,RM,Rome,Italy,00192,1,RM,lucas.mancini@yahoo.it,,Lucas,47,Mancini,+39 06 39733434,671,1271116800000,108,581,5.94,0.99,
"Via Degli Scipioni, 43","Via Degli Scipioni, 43",Rome,Italy,00192,RM,Rome,Italy,00192,1,RM,lucas.mancini@yahoo.it,,Lucas,47,Mancini,+39 06 39733434,672,1271116800000,108,582,5.94,0.99,
"Via Degli Scipioni, 43","Via Degli Scipioni, 43",Rome,Italy,00192,RM,Rome,Italy,00192,1,RM,lucas.mancini@yahoo.it,,Lucas,47,Mancini,+39 06 39733434,673,1291248000000,160,872,0.99,0.99,
Ordynacka 10,Ordynacka 10,Warsaw,Poland,00-358,,Warsaw,Poland,00-358,1,,stanisław.wójcik@wp.pl,,Stanisław,49,Wójcik,+48 22 828 37 39,674,1254873600000,64,343,1.98,0.99,
Ordynacka 10,Ordynacka 10,Warsaw,Poland,00-358,,Warsaw,Poland,00-358,1,,stanisław.wójcik@wp.pl,,Stanisław,49,Wójcik,+48 22 828 37 39,675,1254873600000,64,344,1.98,0.99,
Ordynacka 10,Ordynacka 10,Warsaw,Poland,00-358,,Warsaw,Poland,00-358,1,,stanisław.wójcik@wp.pl,,Stanisław,49,Wójcik,+48 22 828 37 39,676,1258416000000,75,402,13.86,0.99,


In [0]:
(spark.table("invoice_raw_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{invoice_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_invoice_bronze"))

Out[27]: <pyspark.sql.streaming.query.StreamingQuery at 0x7ff880d44bb0>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_invoice_bronze")
  .createOrReplaceTempView("invoice_silver_tempview"))

In [0]:
%sql
SELECT * FROM invoice_silver_tempview

Address,BillingAddress,BillingCity,BillingCountry,BillingPostalCode,BillingState,City,Country,PostalCode,Quantity,State,customer_email,customer_fax,customer_first_name,customer_key,customer_last_name,customer_phone,fact_invoice_key,invoice_date_key,invoice_key,invoice_line_key,total_price,unit_price,_rescued_data
Theodor-Heuss-Straße 34,Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,,Stuttgart,Germany,70174,1,,leonekohler@surfeu.de,,Leonie,2,Köhler,+49 0711 2842222,1,1230768000000,1,1,1.98,0.99,
Theodor-Heuss-Straße 34,Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,,Stuttgart,Germany,70174,1,,leonekohler@surfeu.de,,Leonie,2,Köhler,+49 0711 2842222,2,1230768000000,1,2,1.98,0.99,
Theodor-Heuss-Straße 34,Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,,Stuttgart,Germany,70174,1,,leonekohler@surfeu.de,,Leonie,2,Köhler,+49 0711 2842222,3,1234310400000,12,60,13.86,0.99,
Theodor-Heuss-Straße 34,Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,,Stuttgart,Germany,70174,1,,leonekohler@surfeu.de,,Leonie,2,Köhler,+49 0711 2842222,4,1234310400000,12,61,13.86,0.99,
Theodor-Heuss-Straße 34,Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,,Stuttgart,Germany,70174,1,,leonekohler@surfeu.de,,Leonie,2,Köhler,+49 0711 2842222,5,1234310400000,12,62,13.86,0.99,
Theodor-Heuss-Straße 34,Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,,Stuttgart,Germany,70174,1,,leonekohler@surfeu.de,,Leonie,2,Köhler,+49 0711 2842222,6,1234310400000,12,63,13.86,0.99,
Theodor-Heuss-Straße 34,Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,,Stuttgart,Germany,70174,1,,leonekohler@surfeu.de,,Leonie,2,Köhler,+49 0711 2842222,7,1234310400000,12,64,13.86,0.99,
Theodor-Heuss-Straße 34,Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,,Stuttgart,Germany,70174,1,,leonekohler@surfeu.de,,Leonie,2,Köhler,+49 0711 2842222,8,1234310400000,12,65,13.86,0.99,
Theodor-Heuss-Straße 34,Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,,Stuttgart,Germany,70174,1,,leonekohler@surfeu.de,,Leonie,2,Köhler,+49 0711 2842222,9,1234310400000,12,66,13.86,0.99,
Theodor-Heuss-Straße 34,Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,,Stuttgart,Germany,70174,1,,leonekohler@surfeu.de,,Leonie,2,Köhler,+49 0711 2842222,10,1234310400000,12,67,13.86,0.99,


In [0]:
%sql
DESCRIBE EXTENDED invoice_silver_tempview

col_name,data_type,comment
Address,string,
BillingAddress,string,
BillingCity,string,
BillingCountry,string,
BillingPostalCode,string,
BillingState,string,
City,string,
Country,string,
PostalCode,string,
Quantity,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_invoice_silver_tempview AS (
  SELECT o.fact_invoice_key,
      o.invoice_key,
      o.customer_key,
      c.FirstName AS customer_first_name,
      c.LastName AS customer_last_name,
      c.Company AS customer_company,
      c.Address AS customer_address,
      c.City AS customer_city,
      c.State AS customer_state,
      c.Country AS customer_country,
      c.PostalCode AS customer_postal_code,
      c.Phone AS customer_phone,
      c.Fax AS customer_fax,
      c.email AS customer_email,

      il.invoice_line_key,
      il.UnitPrice AS unit_price,
      il.Quantity AS quantity,
     
      o.invoice_date_key,
      od.day_name_of_week AS invoice_day_name_of_week,
      od.day_of_month AS invoice_day_of_month,
      od.weekday_weekend AS invoice_weekday_weekend,
      od.month_name AS invoice_month_name,
      od.calendar_quarter AS invoice_quarter,
      od.calendar_year AS invoice_year,
      
      o.BillingAddress,
      o.BillingCity,
      o.BillingState,
      o.BillingCountry,
      o.BillingPostalcode

  FROM invoice_silver_tempview AS o
  INNER JOIN chinook_dlh.dim_customer AS c
  ON c.customer_key = o.customer_key
  INNER JOIN chinook_dlh.dim_invoice_line AS il
  ON il.invoice_line_key = o.invoice_line_key
  LEFT OUTER JOIN chinook_dlh.dim_date AS od
  ON od.date_key = o.invoice_date_key
)

In [0]:
(spark.table("fact_invoice_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{invoice_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_invoice_silver"))

Out[32]: <pyspark.sql.streaming.query.StreamingQuery at 0x7ff880d44220>

In [0]:
%sql
SELECT * FROM fact_invoice_silver

fact_invoice_key,invoice_key,customer_key,customer_first_name,customer_last_name,customer_company,customer_address,customer_city,customer_state,customer_country,customer_postal_code,customer_phone,customer_fax,customer_email,invoice_line_key,unit_price,quantity,invoice_date_key,invoice_day_name_of_week,invoice_day_of_month,invoice_weekday_weekend,invoice_month_name,invoice_quarter,invoice_year,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalcode


In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.fact_invoice_silver

col_name,data_type,comment
fact_invoice_key,bigint,
invoice_key,bigint,
customer_key,bigint,
customer_first_name,string,
customer_last_name,string,
customer_company,string,
customer_address,string,
customer_city,string,
customer_state,string,
customer_country,string,


##### 6.3. Gold Table: Perform Aggregations

In [0]:

%sql
--Query that returns the Total Price grouped by the Quarter Created and Customer
SELECT po.invoice_quarter AS Quarter_Created,
  SUM(po.unit_price) AS Unit_Price
FROM fact_invoice_silver_tempview AS po
INNER JOIN chinook_dlh.dim_customer AS p
ON p.customer_key = po.customer_key
GROUP BY Quarter_Created
ORDER BY Unit_Price DESC

Quarter_Created,Unit_Price
,1020.0100000000048


#### 9.0. Clean up the File System