## Capstone Project: Data Lakehouse with Structured Streaming on Chinook Database
This capstone project provides an opportunity to demonstrate proficiency with many of the software libraries, frameworks, and programming techniques introduced throughout the DS-2002: Data Systems course. The project involves building a Data Lakehouse architecture, leveraging structured streaming and the Chinook database, to analyze sales and related data. The goal is to simulate a modern data pipeline, using the Chinook database as a transactional source, to deliver actionable insights.

**These include:**
- Relational Database Management Systems (MySQL)
- NoSQL *(Not Only SQL)* Systems (eMongoDB)
- File System *(Data Lake)* Source Systems (Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (JSON, CSV)
- Massively Parallel Processing *(MPP)* Data Integration Systems (Databricks)
- Data Integration Patterns (Extract-Transform-Load)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "wnz7kd-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "chinook"

connection_properties = {
  "user" : "wnz7kd",
  "password" : "CPayne091203",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "sandbox.pn3ju"
atlas_database_name = "chinook"
atlas_user_name = "cpayne"
atlas_password = "CPayne091203"

# Data Files (JSON) Information ###############################
# Destination database name
dst_database = "chinook_dlh"

# Base directory in DBFS
base_dir = "dbfs:/FileStore/project-data"
database_dir = f"{base_dir}/{dst_database}"

# Batch and stream data directories
batch_dir = f"{base_dir}/batch"  # Updated: Batch data now has its own folder
stream_dir = f"{base_dir}/stream"  # Updated: Stream data now has its own folder

# Stream directories for Chinook
sales_stream_dir = f"{stream_dir}/sales"  # Updated: For line items

# Output paths for Bronze, Silver, and Gold layers

sales_output_bronze = f"{database_dir}/fact_sales/bronze"
sales_output_silver = f"{database_dir}/fact_sales/silver"
sales_output_gold   = f"{database_dir}/fact_sales/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_sales", True) 


# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS chinook_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS chinook_dlh
COMMENT "DS-2002 ProjectCapstone Database"
LOCATION "dbfs:/FileStore/project-data/chinook_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 ProjectCapstone");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wnz7kd-mysql.mysql.database.azure.com/chinook_dlh", --Replace with your Server Name
  dbtable "dim_date",
  user "wnz7kd",    --Replace with your User Name
  password "CPayne091203"  --Replace with you password
)

In [0]:
%sql
USE DATABASE chinook_dlh;

CREATE OR REPLACE TABLE chinook_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/project-data/chinook_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20210101,2021-01-01,01-Jan-2021,01-01-2021,01-01-2021,6,Friday,1,1,Weekday,53,January,1,Y,1,2021,2021-01,2021Q1,7,3,2021,2021-07,2021Q3
20210102,2021-01-02,02-Jan-2021,01-02-2021,02-01-2021,7,Saturday,2,2,Weekend,53,January,1,Y,1,2021,2021-01,2021Q1,7,3,2021,2021-07,2021Q3
20210103,2021-01-03,03-Jan-2021,01-03-2021,03-01-2021,1,Sunday,3,3,Weekend,53,January,1,Y,1,2021,2021-01,2021Q1,7,3,2021,2021-07,2021Q3
20210104,2021-01-04,04-Jan-2021,01-04-2021,04-01-2021,2,Monday,4,4,Weekday,1,January,1,Y,1,2021,2021-01,2021Q1,7,3,2021,2021-07,2021Q3
20210105,2021-01-05,05-Jan-2021,01-05-2021,05-01-2021,3,Tuesday,5,5,Weekday,1,January,1,Y,1,2021,2021-01,2021Q1,7,3,2021,2021-07,2021Q3


##### 1.3. Create a New Table that Sources Artist Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_artists" that extracts data from your MySQL chinook database.
CREATE OR REPLACE TEMPORARY VIEW view_artists
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wnz7kd-mysql.mysql.database.azure.com/chinook_dlh",
  dbtable "dim_artists",
  user "wnz7kd",
  password "CPayne091203"
);



In [0]:
%sql
-- Switch to the correct database
USE DATABASE chinook_dlh;

-- Create or replace the Delta table
CREATE OR REPLACE TABLE chinook_dlh.dim_artists
COMMENT "Artist Dimension Table"
LOCATION "dbfs:/FileStore/project-data/chinook_dlh/dim_artists"
AS SELECT 
  artist_id AS artist_key,  -- Rename the column
  artist_name
FROM view_artists;


num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_artists;

col_name,data_type,comment
artist_key,int,
artist_name,varchar(255),
,,
# Delta Statistics Columns,,
Column Names,"artist_key, artist_name",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,chinook_dlh,


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_artists LIMIT 5

artist_key,artist_name
1,AC/DC
2,Accept
3,Aerosmith
4,Alanis Morissette
5,Alice In Chains


##### 1.4. Create a New Table that Sources Customer Dimension Data from an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_customers
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wnz7kd-mysql.mysql.database.azure.com/chinook_dlh",
  dbtable "dim_customers", -- Change to your MySQL table for customers
  user "wnz7kd",
  password "CPayne091203"
);


In [0]:
%sql
USE DATABASE chinook_dlh;

-- Create or replace the Delta table for customers
CREATE OR REPLACE TABLE chinook_dlh.dim_customers
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/project-data/chinook_dlh/dim_customers"
AS SELECT 
  customer_id AS customer_key, -- Rename the column to 'customer_key'
  first_name,
  last_name,
  company,
  address,
  city,
  state,
  country,
  postal_code,
  phone,
  email
FROM view_customers;


num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_customers;

col_name,data_type,comment
customer_key,int,
first_name,varchar(50),
last_name,varchar(50),
company,varchar(100),
address,varchar(255),
city,varchar(100),
state,varchar(100),
country,varchar(100),
postal_code,varchar(20),
phone,varchar(20),


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_customers LIMIT 5

customer_key,first_name,last_name,company,address,city,state,country,postal_code,phone,email
1,Luís,Gonçalves,Embraer - Empresa Brasileira de Aeronáutica S.A.,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,luisg@embraer.com.br
2,Leonie,Köhler,,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,+49 0711 2842222,leonekohler@surfeu.de
3,François,Tremblay,,1498 rue Bélanger,Montréal,QC,Canada,H2G 1A7,+1 (514) 721-4711,ftremblay@gmail.com
4,Bjørn,Hansen,,Ullevålsveien 14,Oslo,,Norway,0171,+47 22 44 22 22,bjorn.hansen@yahoo.no
5,František,Wichterlová,JetBrains s.r.o.,Klanova 9/506,Prague,,Czech Republic,14700,+420 2 4172 5555,frantisekw@jetbrains.com


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database

##### 2.1. Fetch Track Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "cpayne"
val pwd = "CPayne091203"
val clusterName = "sandbox.pn3ju"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_track = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "chinook") // MongoDB database name
  .option("collection", "Track") // MongoDB collection name
  .load()
  .withColumnRenamed("TrackId", "track_key") // Rename TrackId to track_key
  .withColumnRenamed("Name", "track_name")  // Optionally rename Name to track_name
  .withColumnRenamed("AlbumId", "album_key") 
  .withColumnRenamed("GenreId", "genre_key") 

// Display the DataFrame to verify the changes
display(df_track)


album_key,Bytes,Composer,genre_key,MediaTypeId,Milliseconds,track_name,track_key,UnitPrice,_id
1,11170334,"Angus Young, Malcolm Young, Brian Johnson",1,1,343719,For Those About To Rock (We Salute You),1,0.99,List(6722801a19c3d1997911aaef)
2,5510424,"U. Dirkschneider, W. Hoffmann, H. Frank, P. Baltes, S. Kaufmann, G. Hoffmann",1,2,342562,Balls to the Wall,2,0.99,List(6722801a19c3d1997911aaf0)
3,3990994,"F. Baltes, S. Kaufman, U. Dirkscneider & W. Hoffman",1,2,230619,Fast As a Shark,3,0.99,List(6722801a19c3d1997911aaf1)
3,4331779,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. Dirkscneider & W. Hoffman",1,2,252051,Restless and Wild,4,0.99,List(6722801a19c3d1997911aaf2)
3,6290521,Deaffy & R.A. Smith-Diesel,1,2,375418,Princess of the Dawn,5,0.99,List(6722801a19c3d1997911aaf3)
1,6713451,"Angus Young, Malcolm Young, Brian Johnson",1,1,205662,Put The Finger On You,6,0.99,List(6722801a19c3d1997911aaf4)
1,7636561,"Angus Young, Malcolm Young, Brian Johnson",1,1,233926,Let's Get It Up,7,0.99,List(6722801a19c3d1997911aaf5)
1,6852860,"Angus Young, Malcolm Young, Brian Johnson",1,1,210834,Inject The Venom,8,0.99,List(6722801a19c3d1997911aaf6)
1,6599424,"Angus Young, Malcolm Young, Brian Johnson",1,1,203102,Snowballed,9,0.99,List(6722801a19c3d1997911aaf7)
1,8611245,"Angus Young, Malcolm Young, Brian Johnson",1,1,263497,Evil Walks,10,0.99,List(6722801a19c3d1997911aaf8)


In [0]:
%scala
df_track.printSchema()

##### 2.1.2. Use the Spark DataFrame to Create a New Track Dimension Table in the Databricks Metadata Database (chinook_dlh)

In [0]:
%scala
df_track.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_track")

In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_track

col_name,data_type,comment
album_key,int,
Bytes,int,
Composer,string,
genre_key,int,
MediaTypeId,int,
Milliseconds,int,
track_name,string,
track_key,int,
UnitPrice,double,
_id,struct,


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_track LIMIT 5

album_key,Bytes,Composer,genre_key,MediaTypeId,Milliseconds,track_name,track_key,UnitPrice,_id
1,11170334,"Angus Young, Malcolm Young, Brian Johnson",1,1,343719,For Those About To Rock (We Salute You),1,0.99,List(6722801a19c3d1997911aaef)
2,5510424,"U. Dirkschneider, W. Hoffmann, H. Frank, P. Baltes, S. Kaufmann, G. Hoffmann",1,2,342562,Balls to the Wall,2,0.99,List(6722801a19c3d1997911aaf0)
3,3990994,"F. Baltes, S. Kaufman, U. Dirkscneider & W. Hoffman",1,2,230619,Fast As a Shark,3,0.99,List(6722801a19c3d1997911aaf1)
3,4331779,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. Dirkscneider & W. Hoffman",1,2,252051,Restless and Wild,4,0.99,List(6722801a19c3d1997911aaf2)
3,6290521,Deaffy & R.A. Smith-Diesel,1,2,375418,Princess of the Dawn,5,0.99,List(6722801a19c3d1997911aaf3)


##### 2.2.1 Fetch Album Dimension Data from the New MongoDB Collection

In [0]:
%scala

val df_album = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "chinook") // MongoDB database name
  .option("collection", "Album") // MongoDB collection name
  .load()
  .withColumnRenamed("AlbumId", "album_key") // Rename TrackId to track_key
  .withColumnRenamed("Title", "album_name")  // Optionally rename Name to track_name
  .withColumnRenamed("ArtistId", "artist_key") 

// Display the DataFrame to verify the changes
display(df_album)

album_key,artist_key,album_name,_id
1,1,For Those About To Rock We Salute You,List(6722801819c3d1997911a994)
2,2,Balls to the Wall,List(6722801819c3d1997911a995)
3,2,Restless and Wild,List(6722801819c3d1997911a996)
4,1,Let There Be Rock,List(6722801819c3d1997911a997)
5,3,Big Ones,List(6722801819c3d1997911a998)
6,4,Jagged Little Pill,List(6722801819c3d1997911a999)
7,5,Facelift,List(6722801819c3d1997911a99a)
8,6,Warner 25 Anos,List(6722801819c3d1997911a99b)
9,7,Plays Metallica By Four Cellos,List(6722801819c3d1997911a99c)
10,8,Audioslave,List(6722801819c3d1997911a99d)


##### 2.2.2 Use the Spark DataFrame to Create a New Albums Dimension Table in the Databricks Metadata Databse (chinook_dlh)

In [0]:
%scala
df_album.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_album")

In [0]:
%sql
SELECT * FROM chinook_dlh.dim_album LIMIT 5

album_key,artist_key,album_name,_id
1,1,For Those About To Rock We Salute You,List(6722801819c3d1997911a994)
2,2,Balls to the Wall,List(6722801819c3d1997911a995)
3,2,Restless and Wild,List(6722801819c3d1997911a996)
4,1,Let There Be Rock,List(6722801819c3d1997911a997)
5,3,Big Ones,List(6722801819c3d1997911a998)


##### 2.3.1 Fetch Invoice Dimension Data from the New MongoDB Collection

In [0]:
%scala
import org.apache.spark.sql.functions.date_format
val df_invoice = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri) // MongoDB Atlas connection URI
.option("database", "chinook") // MongoDB database name
.option("collection", "Invoice") // Collection name for invoices
.load()
.withColumnRenamed("InvoiceId", "invoice_key") // Rename InvoiceId to invoice_key
.withColumnRenamed("CustomerId", "customer_key") // Rename CustomerId to customer_key
.withColumn("date_key", date_format($"InvoiceDate", "yyyyMMdd"))
.select(
    "invoice_key",       // Primary key for the invoice
    "customer_key",      // Foreign key linking to the customer
    "date_key",     // Date of the invoice
    "BillingAddress",  // Billing address for the invoice
    "BillingCity",     // Billing city
    "BillingState",    // Billing state
    "BillingCountry",  // Billing country
    "BillingPostalCode", // Postal code for billing
    "Total"            // Total amount of the invoice
)

display(df_invoice)


invoice_key,customer_key,date_key,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total
1,2,20210101,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98
2,4,20210102,Ullevålsveien 14,Oslo,,Norway,0171,3.96
3,8,20210103,Grétrystraat 63,Brussels,,Belgium,1000,5.94
4,14,20210106,8210 111 ST NW,Edmonton,AB,Canada,T6G 2C7,8.91
5,23,20210111,69 Salem Street,Boston,MA,USA,2113,13.86
6,37,20210119,Berger Straße 10,Frankfurt,,Germany,60316,0.99
7,38,20210201,Barbarossastraße 19,Berlin,,Germany,10779,1.98
8,40,20210201,"8, Rue Hanovre",Paris,,France,75002,1.98
9,42,20210202,"9, Place Louis Barthou",Bordeaux,,France,33000,3.96
10,46,20210203,3 Chatham Street,Dublin,Dublin,Ireland,,5.94


In [0]:
%scala
df_invoice.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Invoices Dimension Table in the Databricks Metadata Database (chinook_dlh)

In [0]:
%scala
df_invoice.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_invoice")

In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_invoice

col_name,data_type,comment
invoice_key,int,
customer_key,int,
date_key,string,
BillingAddress,string,
BillingCity,string,
BillingState,string,
BillingCountry,string,
BillingPostalCode,string,
Total,double,
,,


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_invoice LIMIT 5

invoice_key,customer_key,date_key,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total
1,2,20210101,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98
2,4,20210102,Ullevålsveien 14,Oslo,,Norway,0171,3.96
3,8,20210103,Grétrystraat 63,Brussels,,Belgium,1000,5.94
4,14,20210106,8210 111 ST NW,Edmonton,AB,Canada,T6G 2C7,8.91
5,23,20210111,69 Salem Street,Boston,MA,USA,2113,13.86


##### 2.4.1 Fetch Invoice Line Dimension Data from the New MongoDB Collection

In [0]:
%scala
val df_invoice_lines = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri) // MongoDB Atlas connection URI
.option("database", "chinook") // MongoDB database name
.option("collection", "InvoiceLine") // Collection name for invoice lines
.load()
.withColumnRenamed("InvoiceLineId", "invoice_line_key") // Rename InvoiceLineId to invoice_line_key
.withColumnRenamed("InvoiceId", "invoice_key") // Fix typo: Rename InvoiceId to invoice_key
.withColumnRenamed("TrackId", "track_key") // Rename TrackId to track_key
.select(
    "invoice_line_key",  // Primary key for the invoice line
    "invoice_key",       // Foreign key linking to the invoice
    "track_key",         // Foreign key linking to the track (product)
    "UnitPrice",         // Price per track
    "Quantity"           // Quantity purchased
)
display(df_invoice_lines)


invoice_line_key,invoice_key,track_key,UnitPrice,Quantity
1,1,2,0.99,1
2,1,4,0.99,1
3,2,6,0.99,1
4,2,8,0.99,1
5,2,10,0.99,1
6,2,12,0.99,1
7,3,16,0.99,1
8,3,20,0.99,1
9,3,24,0.99,1
10,3,28,0.99,1


In [0]:
%scala
df_invoice_lines.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Invoice Line Dimension Table in the Databricks Metadata Database (chinook_dlh)

In [0]:
%scala
df_invoice_lines.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_invoice_lines")

In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_invoice_lines

col_name,data_type,comment
invoice_line_key,int,
invoice_key,int,
track_key,int,
UnitPrice,double,
Quantity,int,
,,
# Delta Statistics Columns,,
Column Names,"Quantity, UnitPrice, invoice_line_key, invoice_key, track_key",
Column Selection Method,first-32,
,,


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_invoice_lines LIMIT 5

invoice_line_key,invoice_key,track_key,UnitPrice,Quantity
1,1,2,0.99,1
2,1,4,0.99,1
3,2,6,0.99,1
4,2,8,0.99,1
5,2,10,0.99,1


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
genre_csv = f"{batch_dir}/genre.csv"

df_genre = spark.read.format('csv').options(header='true', inferSchema='true').load(genre_csv)
df_genre = df_genre.withColumnRenamed("GenreId", "genre_key")
display(df_genre)

genre_key,Name
1,Rock
2,Jazz
3,Metal
4,Alternative & Punk
5,Rock And Roll
6,Blues
7,Latin
8,Reggae
9,Pop
10,Soundtrack


In [0]:
df_genre.printSchema()

root
 |-- genre_key: integer (nullable = true)
 |-- Name: string (nullable = true)



In [0]:
df_genre.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_genres")

In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_genres;

col_name,data_type,comment
genre_key,int,
Name,string,
,,
# Delta Statistics Columns,,
Column Names,"genre_key, Name",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,chinook_dlh,


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_genres LIMIT 5;

genre_key,Name
1,Rock
2,Jazz
3,Metal
4,Alternative & Punk
5,Rock And Roll


##### 3.2 Use PySpark to Read Employee Dimension Data from CSV File

In [0]:
employee_json = f"{batch_dir}/employee.json"

df_employee = spark.read.format('json').options(inferSchema='true', multiline='true').load(employee_json)
df_employee = df_employee.withColumnRenamed("EmployeeId", "employee_key")
display(df_employee)

Address,BirthDate,City,Country,Email,employee_key,Fax,FirstName,HireDate,LastName,Phone,PostalCode,ReportsTo,State,Title
11120 Jasper Ave NW,1962-02-18 00:00:00,Edmonton,Canada,andrew@chinookcorp.com,1,+1 (780) 428-3457,Andrew,2002-08-14 00:00:00,Adams,+1 (780) 428-9482,T5K 2N1,,AB,General Manager
825 8 Ave SW,1958-12-08 00:00:00,Calgary,Canada,nancy@chinookcorp.com,2,+1 (403) 262-3322,Nancy,2002-05-01 00:00:00,Edwards,+1 (403) 262-3443,T2P 2T3,1.0,AB,Sales Manager
1111 6 Ave SW,1973-08-29 00:00:00,Calgary,Canada,jane@chinookcorp.com,3,+1 (403) 262-6712,Jane,2002-04-01 00:00:00,Peacock,+1 (403) 262-3443,T2P 5M5,2.0,AB,Sales Support Agent
683 10 Street SW,1947-09-19 00:00:00,Calgary,Canada,margaret@chinookcorp.com,4,+1 (403) 263-4289,Margaret,2003-05-03 00:00:00,Park,+1 (403) 263-4423,T2P 5G3,2.0,AB,Sales Support Agent
7727B 41 Ave,1965-03-03 00:00:00,Calgary,Canada,steve@chinookcorp.com,5,1 (780) 836-9543,Steve,2003-10-17 00:00:00,Johnson,1 (780) 836-9987,T3B 1Y7,2.0,AB,Sales Support Agent
5827 Bowness Road NW,1973-07-01 00:00:00,Calgary,Canada,michael@chinookcorp.com,6,+1 (403) 246-9899,Michael,2003-10-17 00:00:00,Mitchell,+1 (403) 246-9887,T3B 0C5,1.0,AB,IT Manager
590 Columbia Boulevard West,1970-05-29 00:00:00,Lethbridge,Canada,robert@chinookcorp.com,7,+1 (403) 456-8485,Robert,2004-01-02 00:00:00,King,+1 (403) 456-9986,T1K 5N8,6.0,AB,IT Staff
923 7 ST NW,1968-01-09 00:00:00,Lethbridge,Canada,laura@chinookcorp.com,8,+1 (403) 467-8772,Laura,2004-03-04 00:00:00,Callahan,+1 (403) 467-3351,T1H 1Y8,6.0,AB,IT Staff


In [0]:
df_employee.printSchema()

root
 |-- Address: string (nullable = true)
 |-- BirthDate: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- employee_key: long (nullable = true)
 |-- Fax: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- HireDate: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- PostalCode: string (nullable = true)
 |-- ReportsTo: long (nullable = true)
 |-- State: string (nullable = true)
 |-- Title: string (nullable = true)



In [0]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_employees")

In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_employees;

col_name,data_type,comment
Address,string,
BirthDate,string,
City,string,
Country,string,
Email,string,
employee_key,bigint,
Fax,string,
FirstName,string,
HireDate,string,
LastName,string,


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_employees LIMIT 5;

Address,BirthDate,City,Country,Email,employee_key,Fax,FirstName,HireDate,LastName,Phone,PostalCode,ReportsTo,State,Title
11120 Jasper Ave NW,1962-02-18 00:00:00,Edmonton,Canada,andrew@chinookcorp.com,1,+1 (780) 428-3457,Andrew,2002-08-14 00:00:00,Adams,+1 (780) 428-9482,T5K 2N1,,AB,General Manager
825 8 Ave SW,1958-12-08 00:00:00,Calgary,Canada,nancy@chinookcorp.com,2,+1 (403) 262-3322,Nancy,2002-05-01 00:00:00,Edwards,+1 (403) 262-3443,T2P 2T3,1.0,AB,Sales Manager
1111 6 Ave SW,1973-08-29 00:00:00,Calgary,Canada,jane@chinookcorp.com,3,+1 (403) 262-6712,Jane,2002-04-01 00:00:00,Peacock,+1 (403) 262-3443,T2P 5M5,2.0,AB,Sales Support Agent
683 10 Street SW,1947-09-19 00:00:00,Calgary,Canada,margaret@chinookcorp.com,4,+1 (403) 263-4289,Margaret,2003-05-03 00:00:00,Park,+1 (403) 263-4423,T2P 5G3,2.0,AB,Sales Support Agent
7727B 41 Ave,1965-03-03 00:00:00,Calgary,Canada,steve@chinookcorp.com,5,1 (780) 836-9543,Steve,2003-10-17 00:00:00,Johnson,1 (780) 836-9987,T3B 1Y7,2.0,AB,Sales Support Agent


#####Verify Dimension Tables

In [0]:
%sql
USE chinook_dlh;
SHOW TABLES

database,tableName,isTemporary
chinook_dlh,dim_album,False
chinook_dlh,dim_artists,False
chinook_dlh,dim_customers,False
chinook_dlh,dim_date,False
chinook_dlh,dim_employees,False
chinook_dlh,dim_genres,False
chinook_dlh,dim_invoice,False
chinook_dlh,dim_invoice_lines,False
chinook_dlh,dim_track,False
,_sqldf,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Sales Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", sales_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(sales_stream_dir)
 .createOrReplaceTempView("sales_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW sales_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM sales_raw_tempview
)

In [0]:
%sql
SELECT * FROM sales_bronze_tempview

CustomerID,InvoiceID,InvoiceLineID,ProductID,Quantity,UnitPrice,date_key,_rescued_data,receipt_time,source_file
2,1,1,2,1,0.99,20210101,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
2,1,2,4,1,0.99,20210101,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
4,2,3,6,1,0.99,20210102,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
4,2,4,8,1,0.99,20210102,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
4,2,5,10,1,0.99,20210102,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
4,2,6,12,1,0.99,20210102,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
8,3,7,16,1,0.99,20210103,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
8,3,8,20,1,0.99,20210103,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
8,3,9,24,1,0.99,20210103,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
8,3,10,28,1,0.99,20210103,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json


In [0]:
(spark.table("sales_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{sales_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_sales_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f2e6354ba10>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_sales_bronze")
  .createOrReplaceTempView("sales_silver_tempview"))

In [0]:
%sql
SELECT * FROM sales_silver_tempview

CustomerID,InvoiceID,InvoiceLineID,ProductID,Quantity,UnitPrice,date_key,_rescued_data,receipt_time,source_file
2,1,1,2,1,0.99,20210101,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
2,1,2,4,1,0.99,20210101,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
4,2,3,6,1,0.99,20210102,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
4,2,4,8,1,0.99,20210102,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
4,2,5,10,1,0.99,20210102,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
4,2,6,12,1,0.99,20210102,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
8,3,7,16,1,0.99,20210103,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
8,3,8,20,1,0.99,20210103,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
8,3,9,24,1,0.99,20210103,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json
8,3,10,28,1,0.99,20210103,,2024-12-08T01:38:12.806Z,dbfs:/FileStore/project-data/stream/sales/sales_fact_split1.json


In [0]:
%sql
DESCRIBE EXTENDED sales_silver_tempview

col_name,data_type,comment
CustomerID,bigint,
InvoiceID,bigint,
InvoiceLineID,bigint,
ProductID,bigint,
Quantity,bigint,
UnitPrice,double,
date_key,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_sales_silver_tempview AS (
  SELECT 
      o.CustomerID AS customer_key,          -- Customer ID as foreign key
      o.InvoiceID AS invoice_key,            -- Invoice ID as foreign key
      o.InvoiceLineID AS invoice_line_key,   -- Invoice Line ID
      o.ProductID AS track_key,              -- Product (Track) ID as foreign key
      o.Quantity,                            -- Quantity of the product sold
      o.UnitPrice,                           -- Unit price of the product
      (o.Quantity * o.UnitPrice) AS line_total, -- Calculate line total
      o.date_key,                            -- Date key for the transaction
      t.track_name,                          -- Name of the track
      t.Composer AS track_composer,          -- Composer of the track
      t.genre_key AS genre_id,               -- Genre ID
      g.Name AS genre_name,                  -- Genre name (e.g., Rock, Pop)
      t.album_key AS album_id,               -- Album ID
      al.album_name AS album_name,           -- Album name from dim_album
      al.artist_key AS artist_id,            -- Artist key from dim_album
      ar.artist_name AS artist_name,         -- Artist name from dim_artists
      c.last_name AS customer_last_name,     -- Customer's last name
      c.first_name AS customer_first_name,   -- Customer's first name
      c.City AS customer_city,               -- Customer's city
      d.day_name_of_week AS order_day_name_of_week, -- Day of the week for the order
      d.day_of_month AS order_day_of_month,         -- Day of the month for the order
      d.weekday_weekend AS order_weekday_weekend,   -- Weekday or weekend for the order
      d.month_name AS order_month_name,             -- Month of the order
      d.calendar_quarter AS order_quarter,          -- Quarter of the order
      d.calendar_year AS order_year                 -- Year of the order
  FROM sales_silver_tempview AS o
  LEFT JOIN chinook_dlh.dim_track AS t
    ON o.ProductID = t.track_key
  LEFT JOIN chinook_dlh.dim_genres AS g
    ON t.genre_key = g.genre_key
  LEFT JOIN chinook_dlh.dim_album AS al
    ON t.album_key = al.album_key
  LEFT JOIN chinook_dlh.dim_artists AS ar
    ON al.artist_key = ar.artist_key
  LEFT JOIN chinook_dlh.dim_customers AS c
    ON c.customer_key = o.CustomerID
  LEFT OUTER JOIN chinook_dlh.dim_date AS d
    ON o.date_key = d.date_key
);





In [0]:
(spark.table("fact_sales_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{sales_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_sales_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f2e6357ff10>

In [0]:
%sql
SELECT * FROM fact_sales_silver

customer_key,invoice_key,invoice_line_key,track_key,Quantity,UnitPrice,line_total,date_key,track_name,track_composer,genre_id,genre_name,album_id,album_name,artist_id,artist_name,customer_last_name,customer_first_name,customer_city,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year
39,334,1813,539,1,0.99,0.99,20250107,Rita Lee,Arnaldo Baptista/Rita Lee/Sérgio Dias,4,Alternative & Punk,42,Minha História,57,Os Mutantes,Bernard,Camille,Paris,Tuesday,7,Weekday,January,1,2025
47,347,1875,901,1,0.99,0.99,20250305,After Midnight,Clapton/J. J. Cale,6,Blues,72,The Cream Of Clapton,81,Eric Clapton,Mancini,Lucas,Rome,Wednesday,5,Weekday,March,1,2025
37,138,748,1040,1,0.99,0.99,20220823,Summer Love,hans bradtke/heinz meier/johnny mercer,12,Easy Listening,83,My Way: The Best Of Frank Sinatra [Disc 1],85,Frank Sinatra,Zimmermann,Fynn,Frankfurt,Tuesday,23,Weekday,August,3,2022
17,243,1324,1071,1,0.99,0.99,20231201,Divirta-Se (Saindo Da Sua),,7,Latin,84,Roda De Funk,86,Funk Como Le Gusta,Smith,Jack,Redmond,Friday,1,Weekday,December,4,2023
34,246,1332,1118,1,0.99,0.99,20231222,Palco,Gilberto Gil,7,Latin,73,Unplugged,81,Eric Clapton,Fernandes,João,Lisbon,Friday,22,Weekday,December,4,2023
37,367,1986,1583,1,0.99,0.99,20250603,Going To California,Robert Plant,1,Rock,127,BBC Sessions [Disc 2] [Live],22,Led Zeppelin,Zimmermann,Fynn,Frankfurt,Tuesday,3,Weekday,June,2,2025
52,163,880,1845,1,0.99,0.99,20221216,Bleeding Me,"James Hetfield, Lars Ulrich, Kirk Hammett",3,Metal,151,Load,50,Metallica,Jones,Emma,London,Friday,16,Weekday,December,4,2022
3,165,887,1875,1,0.99,0.99,20221220,Ride The Lightning,Metallica,3,Metal,154,Ride The Lightning,50,Metallica,Tremblay,François,Montréal,Tuesday,20,Weekday,December,4,2022
51,271,1466,1909,1,0.99,0.99,20240403,The Meaning Of The Blues,"R. Troup, L. Worth",2,Jazz,157,Miles Ahead,68,Miles Davis,Johansson,Joakim,Stockholm,Wednesday,3,Weekday,April,2,2024
15,276,1490,2062,1,0.99,0.99,20240426,Life During Wartime,Chris Frantz/David Byrne/Jerry Harrison/Tina Weymouth,7,Latin,167,Acústico MTV,113,Os Paralamas Do Sucesso,Peterson,Jennifer,Vancouver,Friday,26,Weekday,April,2,2024


In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.fact_sales_silver

col_name,data_type,comment
customer_key,bigint,
invoice_key,bigint,
invoice_line_key,bigint,
track_key,bigint,
Quantity,bigint,
UnitPrice,double,
line_total,double,
date_key,bigint,
track_name,string,
track_composer,string,


##### 6.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach. This includes the amount spent by each customer, along with the Customers' ID, First & Last Name, sorted by highest total spent.

In [0]:
%sql
CREATE OR REPLACE TABLE chinook_dlh.fact_top_customers_gold AS (
  SELECT 
      customer_key AS CustomerID,
      customer_last_name AS LastName,
      customer_first_name AS FirstName,
      SUM(line_total) AS TotalSpent -- Aggregating total spending
  FROM chinook_dlh.fact_sales_silver
  GROUP BY CustomerID, LastName, FirstName
  ORDER BY TotalSpent DESC
  LIMIT 10 -- Limit to the top 10 customers
);

SELECT * FROM chinook_dlh.fact_top_customers_gold;

CustomerID,LastName,FirstName,TotalSpent
6,Holý,Helena,49.62
26,Cunningham,Richard,47.62
57,Rojas,Luis,46.61999999999999
45,Kovács,Ladislav,45.620000000000005
46,O'Reilly,Hugh,45.62
37,Zimmermann,Fynn,43.620000000000005
28,Barnett,Julia,43.62
24,Ralston,Frank,43.61999999999999
25,Stevens,Victor,42.620000000000005
7,Gruber,Astrid,42.620000000000005


This gold table shows the most popular genres for each month recorded in each year. I like this table because it shows change in preferences over time.

In [0]:
%sql
CREATE OR REPLACE TABLE chinook_dlh.fact_genre_popularity_by_date_gold AS (
  SELECT 
      genre_name AS GenreName,
      order_year AS Year,         -- Add year from the date dimension
      order_month_name AS Month,           -- Add month from the date dimension
      COUNT(track_key) AS TracksPurchased -- Count the number of tracks purchased
  FROM chinook_dlh.fact_sales_silver
  GROUP BY GenreName, Year, Month
  ORDER BY Year, Month, TracksPurchased DESC
);

SELECT * FROM chinook_dlh.fact_genre_popularity_by_date_gold;

GenreName,Year,Month,TracksPurchased
Rock,2021,April,22
Latin,2021,April,11
Blues,2021,April,3
Alternative & Punk,2021,April,1
Jazz,2021,April,1
Rock,2021,August,20
Latin,2021,August,10
Metal,2021,August,6
R&B/Soul,2021,August,2
Rock,2021,December,18


Just for fun, this shows the most popular genres throughout the entire recorded sales data.

In [0]:
%sql
CREATE OR REPLACE TABLE chinook_dlh.fact_genre_popularity_gold AS (
  SELECT 
      genre_id AS GenreID,
      genre_name AS GenreName,
      COUNT(track_key) AS TracksPurchased -- Count the number of tracks purchased
  FROM chinook_dlh.fact_sales_silver
  GROUP BY GenreID, GenreName
  ORDER BY TracksPurchased DESC
);

SELECT * FROM chinook_dlh.fact_genre_popularity_gold;

GenreID,GenreName,TracksPurchased
1,Rock,835
7,Latin,386
3,Metal,264
4,Alternative & Punk,244
2,Jazz,80
6,Blues,61
19,TV Shows,47
24,Classical,41
14,R&B/Soul,41
8,Reggae,30


#### 7.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/project-data/chinook_dlh