## DS-2002: Final Project
This notebook demonstrates many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-semester capstone project for course **DS-2002: Data Science Systems** at the University of Virginia School of Data Science. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
- Online Transaction Processing Systems (OLTP): *Relational Databases Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
- Online Analytical Processing Systems (OLAP): *Relational Databases Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
- Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

What's more, this project requires students to make effective decisions regarding whether to implement a Cloud-hosted, on-premises hosted, or hybrid architecture.

### Section 1: Prerequisites
 
#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure SQL Server Connection Information 
azuresql_hostname = "ds2002-sql.mysql.database.azure.com"
azuresql_port = 3306
src_database = "chinook"

connection_properties = {
  "user" : "rquillian",
  "password" : "password1!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information 
atlas_cluster_name = "sandbox"
atlas_database_name = "chinook"
atlas_user_name = "rlq3fm"
atlas_password = "Password123!"

# Data Files (JSON) Information
dst_database = "chinook_dw"

base_dir = "dbfs:/FileStore/project"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

output_bronze = f"{database_dir}/fact_invoices/bronze"
output_silver = f"{database_dir}/fact_invoices/silver"
output_gold   = f"{database_dir}/fact_invoices/gold"

# Delete the Streaming Files 
#dbutils.fs.rm(f"{database_dir}/fact_invoices", True)

# Delete the Database Files
#dbutils.fs.rm(database_dir, True)

#### 3.0 Define Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
# ######################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcUrl = f"jdbc:sqlserver://{host_name}:{port};database={db_name}"
    
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe


# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.zibbf.mongodb.net/{db_name}?retryWrites=true&w=majority"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe


# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.zibbf.mongodb.net/{db_name}?retryWrites=true&w=majority"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result


### Section 2: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0 Get Reference Data From an Azure SQL Database

In [0]:
%sql
DROP DATABASE IF EXISTS chinook_dw CASCADE;

In [0]:
%sql
CREATE DATABASE chinook_dw
COMMENT "Capstone Project Database"
LOCATION "dbfs:/FileStore/project/chinook_dw"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone Project");

In [0]:
track = (spark.read
  .format("jdbc")
  .option("driver", "org.mariadb.jdbc.Driver")
  .option("url", "jdbc:mysql://ds2002-sql.mysql.database.azure.com:3306/chinook")
  .option("dbtable", "track")
  .option("user", "rquillian")
  .option("password", "password1!")
  .load()
)

In [0]:
%sql
CREATE TEMPORARY VIEW track_view
USING JDBC
OPTIONS (
  url "jdbc:mysql://ds2002-sql.mysql.database.azure.com:3306/chinook",
  dbtable "track",
  user 'rquillian',
  password 'password1!'
)

In [0]:
%sql
CREATE TEMPORARY VIEW album_view
USING JDBC
OPTIONS (
  url "jdbc:mysql://ds2002-sql.mysql.database.azure.com:3306/chinook",
  dbtable "album",
  user 'rquillian',
  password 'password1!'
)

In [0]:
%sql
CREATE TEMPORARY VIEW artist_view
USING JDBC
OPTIONS (
  url "jdbc:mysql://ds2002-sql.mysql.database.azure.com:3306/chinook",
  dbtable "artist",
  user 'rquillian',
  password 'password1!'
)

In [0]:
%sql
CREATE TEMPORARY VIEW genre_view
USING JDBC
OPTIONS (
  url "jdbc:mysql://ds2002-sql.mysql.database.azure.com:3306/chinook",
  dbtable "genre",
  user 'rquillian',
  password 'password1!'
)

In [0]:
%sql
CREATE TEMPORARY VIEW mediatype_view
USING JDBC
OPTIONS (
  url "jdbc:mysql://ds2002-sql.mysql.database.azure.com:3306/chinook",
  dbtable "mediatype",
  user 'rquillian',
  password 'password1!'
)

In [0]:
%sql
USE DATABASE chinook_dw;

CREATE TABLE IF NOT EXISTS chinook_dw.dim_tracks
COMMENT "Tracks Dimension Table"
LOCATION "dbfs:/FileStore/project/chinook_dw/dim_tracks"

AS SELECT 
  t.TrackId,
  t.Name AS TrackName,
  t.AlbumId,
  a.Title AS AlbumTitle,
  a.ArtistId,
  ar.Name AS ArtistName,
  t.MediaTypeId,
  m.Name AS MediaTypeName,
  t.GenreId,
  g.Name AS GenreName,
  t.Composer,
  t.Milliseconds,
  t.Bytes,
  t.UnitPrice
  FROM track_view AS t
  LEFT OUTER JOIN album_view AS a
  ON t.AlbumId = a.AlbumId
  INNER JOIN artist_view AS ar
  ON a.ArtistId = ar.ArtistId
  LEFT OUTER JOIN genre_view AS g
  ON t.GenreId = g.GenreId
  LEFT OUTER JOIN mediatype_view AS m
  ON t.MediaTypeId = m.MediaTypeId

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM chinook_dw.dim_tracks LIMIT 5

TrackId,TrackName,AlbumId,AlbumTitle,ArtistId,ArtistName,MediaTypeId,MediaTypeName,GenreId,GenreName,Composer,Milliseconds,Bytes,UnitPrice
14,Spellbound,1,For Those About To Rock We Salute You,1,AC/DC,1,MPEG audio file,1,Rock,"Angus Young, Malcolm Young, Brian Johnson",270863,8817038,0.99
13,Night Of The Long Knives,1,For Those About To Rock We Salute You,1,AC/DC,1,MPEG audio file,1,Rock,"Angus Young, Malcolm Young, Brian Johnson",205688,6706347,0.99
12,Breaking The Rules,1,For Those About To Rock We Salute You,1,AC/DC,1,MPEG audio file,1,Rock,"Angus Young, Malcolm Young, Brian Johnson",263288,8596840,0.99
11,C.O.D.,1,For Those About To Rock We Salute You,1,AC/DC,1,MPEG audio file,1,Rock,"Angus Young, Malcolm Young, Brian Johnson",199836,6566314,0.99
10,Evil Walks,1,For Those About To Rock We Salute You,1,AC/DC,1,MPEG audio file,1,Rock,"Angus Young, Malcolm Young, Brian Johnson",263497,8611245,0.99


#### 2.0 Fetch Data from MongoDB
##### Start by pulling the data into dataframes

In [0]:
%scala
import com.mongodb.spark._

val df_customers = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "chinook").option("collection", "customers").load()
val dim_customers = df_customers.drop("Fax","_id")
display(dim_customers)

Address,City,Company,Country,CustomerId,Email,FirstName,LastName,Phone,PostalCode,State,SupportRepId
"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,Embraer - Empresa Brasileira de Aeronáutica S.A.,Brazil,1,luisg@embraer.com.br,Luís,Gonçalves,+55 (12) 3923-5555,12227-000,SP,3
Theodor-Heuss-Straße 34,Stuttgart,,Germany,2,leonekohler@surfeu.de,Leonie,Köhler,+49 0711 2842222,70174,,5
1498 rue Bélanger,Montréal,,Canada,3,ftremblay@gmail.com,François,Tremblay,+1 (514) 721-4711,H2G 1A7,QC,3
Ullevålsveien 14,Oslo,,Norway,4,bjorn.hansen@yahoo.no,Bjørn,Hansen,+47 22 44 22 22,0171,,4
Klanova 9/506,Prague,JetBrains s.r.o.,Czech Republic,5,frantisekw@jetbrains.com,František,Wichterlová,+420 2 4172 5555,14700,,4
Rilská 3174/6,Prague,,Czech Republic,6,hholy@gmail.com,Helena,Holý,+420 2 4177 0449,14300,,5
"Rotenturmstraße 4, 1010 Innere Stadt",Vienne,,Austria,7,astrid.gruber@apple.at,Astrid,Gruber,+43 01 5134505,1010,,5
Grétrystraat 63,Brussels,,Belgium,8,daan_peeters@apple.be,Daan,Peeters,+32 02 219 03 03,1000,,4
Sønder Boulevard 51,Copenhagen,,Denmark,9,kara.nielsen@jubii.dk,Kara,Nielsen,+453 3331 9991,1720,,4
"Rua Dr. Falcão Filho, 155",São Paulo,Woodstock Discos,Brazil,10,eduardo@woodstock.com.br,Eduardo,Martins,+55 (11) 3033-5446,01007-010,SP,4


In [0]:
%scala
import com.mongodb.spark._

val df_employees = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "chinook").option("collection", "employees").load()
val dim_employees = df_employees.drop("BirthDate", "HireDate", "_id")
display(dim_employees)

Address,City,Country,Email,EmployeeId,Fax,FirstName,LastName,Phone,PostalCode,ReportsTo,State,Title
11120 Jasper Ave NW,Edmonton,Canada,andrew@chinookcorp.com,1,+1 (780) 428-3457,Andrew,Adams,+1 (780) 428-9482,T5K 2N1,0,AB,General Manager
825 8 Ave SW,Calgary,Canada,nancy@chinookcorp.com,2,+1 (403) 262-3322,Nancy,Edwards,+1 (403) 262-3443,T2P 2T3,1,AB,Sales Manager
1111 6 Ave SW,Calgary,Canada,jane@chinookcorp.com,3,+1 (403) 262-6712,Jane,Peacock,+1 (403) 262-3443,T2P 5M5,2,AB,Sales Support Agent
683 10 Street SW,Calgary,Canada,margaret@chinookcorp.com,4,+1 (403) 263-4289,Margaret,Park,+1 (403) 263-4423,T2P 5G3,2,AB,Sales Support Agent
7727B 41 Ave,Calgary,Canada,steve@chinookcorp.com,5,1 (780) 836-9543,Steve,Johnson,1 (780) 836-9987,T3B 1Y7,2,AB,Sales Support Agent
5827 Bowness Road NW,Calgary,Canada,michael@chinookcorp.com,6,+1 (403) 246-9899,Michael,Mitchell,+1 (403) 246-9887,T3B 0C5,1,AB,IT Manager
590 Columbia Boulevard West,Lethbridge,Canada,robert@chinookcorp.com,7,+1 (403) 456-8485,Robert,King,+1 (403) 456-9986,T1K 5N8,6,AB,IT Staff
923 7 ST NW,Lethbridge,Canada,laura@chinookcorp.com,8,+1 (403) 467-8772,Laura,Callahan,+1 (403) 467-3351,T1H 1Y8,6,AB,IT Staff


##### Now write the dataframes to the Databricks chinook_dw database

In [0]:
%scala
dim_customers.write.format("delta").mode("overwrite").saveAsTable("chinook_dw.dim_customers")

In [0]:
%scala
dim_employees.write.format("delta").mode("overwrite").saveAsTable("chinook_dw.dim_employees")

##### Query new tables to make sure they are correct

In [0]:
%sql
SELECT * FROM chinook_dw.dim_customers LIMIT 5     

Address,City,Company,Country,CustomerId,Email,FirstName,LastName,Phone,PostalCode,State,SupportRepId
"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,Embraer - Empresa Brasileira de Aeronáutica S.A.,Brazil,1,luisg@embraer.com.br,Luís,Gonçalves,+55 (12) 3923-5555,12227-000,SP,3
Theodor-Heuss-Straße 34,Stuttgart,,Germany,2,leonekohler@surfeu.de,Leonie,Köhler,+49 0711 2842222,70174,,5
1498 rue Bélanger,Montréal,,Canada,3,ftremblay@gmail.com,François,Tremblay,+1 (514) 721-4711,H2G 1A7,QC,3
Ullevålsveien 14,Oslo,,Norway,4,bjorn.hansen@yahoo.no,Bjørn,Hansen,+47 22 44 22 22,0171,,4
Klanova 9/506,Prague,JetBrains s.r.o.,Czech Republic,5,frantisekw@jetbrains.com,František,Wichterlová,+420 2 4172 5555,14700,,4


In [0]:
%sql
SELECT * FROM chinook_dw.dim_employees LIMIT 5     

Address,City,Country,Email,EmployeeId,Fax,FirstName,LastName,Phone,PostalCode,ReportsTo,State,Title
11120 Jasper Ave NW,Edmonton,Canada,andrew@chinookcorp.com,1,+1 (780) 428-3457,Andrew,Adams,+1 (780) 428-9482,T5K 2N1,0,AB,General Manager
825 8 Ave SW,Calgary,Canada,nancy@chinookcorp.com,2,+1 (403) 262-3322,Nancy,Edwards,+1 (403) 262-3443,T2P 2T3,1,AB,Sales Manager
1111 6 Ave SW,Calgary,Canada,jane@chinookcorp.com,3,+1 (403) 262-6712,Jane,Peacock,+1 (403) 262-3443,T2P 5M5,2,AB,Sales Support Agent
683 10 Street SW,Calgary,Canada,margaret@chinookcorp.com,4,+1 (403) 263-4289,Margaret,Park,+1 (403) 263-4423,T2P 5G3,2,AB,Sales Support Agent
7727B 41 Ave,Calgary,Canada,steve@chinookcorp.com,5,1 (780) 836-9543,Steve,Johnson,1 (780) 836-9987,T3B 1Y7,2,AB,Sales Support Agent


#### 3.0 Fetch Data from a File System

The last dimension table (dim_date), will be pulled directly from a File System.

Note that the dim_date json file was originally uploaded by navigating to data > DBFS > Upload, and placed in the directory correpsonding to the batch directory assigned at the beginning of this notebook.

In [0]:
date_csv = f"{batch_dir}/dim_date.csv"

df_date = spark.read.format('csv').options(header='true', inferSchema='true').load(date_csv)
display(df_date)

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01T00:00:00.000+0000,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3
20000102,2000-01-02T00:00:00.000+0000,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3
20000103,2000-01-03T00:00:00.000+0000,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3
20000104,2000-01-04T00:00:00.000+0000,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3
20000105,2000-01-05T00:00:00.000+0000,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3
20000106,2000-01-06T00:00:00.000+0000,2000/01/06,01/06/2000,06/01/2000,5,Thursday,6,6,Weekday,1,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3
20000107,2000-01-07T00:00:00.000+0000,2000/01/07,01/07/2000,07/01/2000,6,Friday,7,7,Weekday,1,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3
20000108,2000-01-08T00:00:00.000+0000,2000/01/08,01/08/2000,08/01/2000,7,Saturday,8,8,Weekend,1,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3
20000109,2000-01-09T00:00:00.000+0000,2000/01/09,01/09/2000,09/01/2000,1,Sunday,9,9,Weekend,1,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3
20000110,2000-01-10T00:00:00.000+0000,2000/01/10,01/10/2000,10/01/2000,2,Monday,10,10,Weekday,2,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3


In [0]:
df_date.write.format("delta").mode("overwrite").saveAsTable("chinook_dw.dim_date")

In [0]:
%sql
SELECT * FROM chinook_dw.dim_date LIMIT 5;

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01T00:00:00.000+0000,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3
20000102,2000-01-02T00:00:00.000+0000,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3
20000103,2000-01-03T00:00:00.000+0000,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3
20000104,2000-01-04T00:00:00.000+0000,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3
20000105,2000-01-05T00:00:00.000+0000,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01-01T00:00:00.000+0000,2000Q1,7,3,2000,2000-07-01T00:00:00.000+0000,2000Q3


Now we have all four dimension tables loaded, as shown below

In [0]:
%sql
USE chinook_dw;
SHOW TABLES

database,tableName,isTemporary
chinook_dw,dim_customers,False
chinook_dw,dim_date,False
chinook_dw,dim_employees,False
chinook_dw,dim_tracks,False
,album_view,True
,artist_view,True
,genre_view,True
,mediatype_view,True
,track_view,True


### Section 3: Integrate reference data (dimension tables) with real time data

Starting with a bronze table that processes 'raw' json data

Data comes from splitting up the table fact_invoices into 10 json files, each with 224 entries

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("invoices_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW invoices_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM invoices_raw_tempview
)

In [0]:
%sql
SELECT * FROM invoices_bronze_tempview

billing_address,billing_city,billing_country,billing_postal_code,billing_state,customer_key,fact_invoice_key,invoice_date_key,invoice_key,invoice_total,quantity,track_key,unit_price,_rescued_data,receipt_time,source_file
"Praça Pio X, 119",Rio de Janeiro,Brazil,20040-020,RJ,12,2017,20130703,373,3.96,1,1793,0.99,,2022-12-06T13:43:41.899+0000,dbfs:/FileStore/project/source_data/stream/invoices10.json
"Praça Pio X, 119",Rio de Janeiro,Brazil,20040-020,RJ,12,2018,20130703,373,3.96,1,1795,0.99,,2022-12-06T13:43:41.899+0000,dbfs:/FileStore/project/source_data/stream/invoices10.json
"Praça Pio X, 119",Rio de Janeiro,Brazil,20040-020,RJ,12,2019,20130703,373,3.96,1,1797,0.99,,2022-12-06T13:43:41.899+0000,dbfs:/FileStore/project/source_data/stream/invoices10.json
"Praça Pio X, 119",Rio de Janeiro,Brazil,20040-020,RJ,12,2020,20130703,373,3.96,1,1799,0.99,,2022-12-06T13:43:41.899+0000,dbfs:/FileStore/project/source_data/stream/invoices10.json
1600 Amphitheatre Parkway,Mountain View,USA,94043-1351,CA,16,2021,20130704,374,5.94,1,1803,0.99,,2022-12-06T13:43:41.899+0000,dbfs:/FileStore/project/source_data/stream/invoices10.json
1600 Amphitheatre Parkway,Mountain View,USA,94043-1351,CA,16,2022,20130704,374,5.94,1,1807,0.99,,2022-12-06T13:43:41.899+0000,dbfs:/FileStore/project/source_data/stream/invoices10.json
1600 Amphitheatre Parkway,Mountain View,USA,94043-1351,CA,16,2023,20130704,374,5.94,1,1811,0.99,,2022-12-06T13:43:41.899+0000,dbfs:/FileStore/project/source_data/stream/invoices10.json
1600 Amphitheatre Parkway,Mountain View,USA,94043-1351,CA,16,2024,20130704,374,5.94,1,1815,0.99,,2022-12-06T13:43:41.899+0000,dbfs:/FileStore/project/source_data/stream/invoices10.json
1600 Amphitheatre Parkway,Mountain View,USA,94043-1351,CA,16,2025,20130704,374,5.94,1,1819,0.99,,2022-12-06T13:43:41.899+0000,dbfs:/FileStore/project/source_data/stream/invoices10.json
1600 Amphitheatre Parkway,Mountain View,USA,94043-1351,CA,16,2026,20130704,374,5.94,1,1823,0.99,,2022-12-06T13:43:41.899+0000,dbfs:/FileStore/project/source_data/stream/invoices10.json


In [0]:
(spark.table("invoices_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_invoices_bronze"))

Silver Table

In the silver table we will include reference data; i.e. the that was added previously (receipt time and source file).

In [0]:
(spark.readStream
  .table("fact_invoices_bronze")
  .createOrReplaceTempView("invoices_silver_tempview"))

In [0]:
%sql
SELECT * FROM invoices_silver_tempview

billing_address,billing_city,billing_country,billing_postal_code,billing_state,customer_key,fact_invoice_key,invoice_date_key,invoice_key,invoice_total,quantity,track_key,unit_price,_rescued_data,receipt_time,source_file
162 E Superior Street,Chicago,USA,60611,IL,24,1793,20121230,332,5.94,1,411,0.99,,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
162 E Superior Street,Chicago,USA,60611,IL,24,1794,20121230,332,5.94,1,415,0.99,,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
162 E Superior Street,Chicago,USA,60611,IL,24,1795,20121230,332,5.94,1,419,0.99,,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
162 E Superior Street,Chicago,USA,60611,IL,24,1796,20121230,332,5.94,1,423,0.99,,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
162 E Superior Street,Chicago,USA,60611,IL,24,1797,20121230,332,5.94,1,427,0.99,,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
162 E Superior Street,Chicago,USA,60611,IL,24,1798,20121230,332,5.94,1,431,0.99,,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
230 Elgin Street,Ottawa,Canada,K2P 1L7,ON,30,1799,20130102,333,8.91,1,437,0.99,,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
230 Elgin Street,Ottawa,Canada,K2P 1L7,ON,30,1800,20130102,333,8.91,1,443,0.99,,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
230 Elgin Street,Ottawa,Canada,K2P 1L7,ON,30,1801,20130102,333,8.91,1,449,0.99,,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
230 Elgin Street,Ottawa,Canada,K2P 1L7,ON,30,1802,20130102,333,8.91,1,455,0.99,,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json


In [0]:
%sql
DESCRIBE EXTENDED invoices_silver_tempview

col_name,data_type,comment
billing_address,string,
billing_city,string,
billing_country,string,
billing_postal_code,string,
billing_state,string,
customer_key,bigint,
fact_invoice_key,bigint,
invoice_date_key,bigint,
invoice_key,bigint,
invoice_total,double,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_invoices_silver_tempview AS (
  SELECT 
      fact_invoice_key
    , invoice_key
    , customer_key
    , track_key
    , invoice_date_key
    , billing_address
    , billing_city
    , billing_state
    , billing_country
    , billing_postal_code
    , unit_price
    , quantity
    , invoice_total
    , receipt_time 
    , source_file
FROM invoices_silver_tempview);

In [0]:
(spark.table("fact_invoices_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_invoices_silver"))

In [0]:
%sql
SELECT * FROM fact_invoices_silver

fact_invoice_key,invoice_key,customer_key,track_key,invoice_date_key,billing_address,billing_city,billing_state,billing_country,billing_postal_code,unit_price,quantity,invoice_total,receipt_time,source_file
1793,332,24,411,20121230,162 E Superior Street,Chicago,IL,USA,60611,0.99,1,5.94,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
1794,332,24,415,20121230,162 E Superior Street,Chicago,IL,USA,60611,0.99,1,5.94,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
1795,332,24,419,20121230,162 E Superior Street,Chicago,IL,USA,60611,0.99,1,5.94,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
1796,332,24,423,20121230,162 E Superior Street,Chicago,IL,USA,60611,0.99,1,5.94,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
1797,332,24,427,20121230,162 E Superior Street,Chicago,IL,USA,60611,0.99,1,5.94,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
1798,332,24,431,20121230,162 E Superior Street,Chicago,IL,USA,60611,0.99,1,5.94,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
1799,333,30,437,20130102,230 Elgin Street,Ottawa,ON,Canada,K2P 1L7,0.99,1,8.91,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
1800,333,30,443,20130102,230 Elgin Street,Ottawa,ON,Canada,K2P 1L7,0.99,1,8.91,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
1801,333,30,449,20130102,230 Elgin Street,Ottawa,ON,Canada,K2P 1L7,0.99,1,8.91,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json
1802,333,30,455,20130102,230 Elgin Street,Ottawa,ON,Canada,K2P 1L7,0.99,1,8.91,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices09.json


In [0]:
%sql
/* ensure first 448 entries were also loaded correctly */
SELECT * FROM fact_invoices_silver
WHERE fact_invoice_key BETWEEN 0 AND 448

fact_invoice_key,invoice_key,customer_key,track_key,invoice_date_key,billing_address,billing_city,billing_state,billing_country,billing_postal_code,unit_price,quantity,invoice_total,receipt_time,source_file
225,40,36,1376,20090615,Tauentzienstraße 8,Berlin,,Germany,10789,0.99,1,13.86,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices02.json
226,41,50,1390,20090623,C/ San Bernardo 85,Madrid,,Spain,28015,0.99,1,0.99,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices02.json
227,42,51,1391,20090706,Celsiusg. 9,Stockholm,,Sweden,11230,0.99,1,1.98,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices02.json
228,42,51,1392,20090706,Celsiusg. 9,Stockholm,,Sweden,11230,0.99,1,1.98,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices02.json
229,43,53,1394,20090706,113 Lupus St,London,,United Kingdom,SW1V 3EN,0.99,1,1.98,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices02.json
230,43,53,1396,20090706,113 Lupus St,London,,United Kingdom,SW1V 3EN,0.99,1,1.98,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices02.json
231,44,55,1398,20090707,421 Bourke Street,Sidney,NSW,Australia,2010,0.99,1,3.96,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices02.json
232,44,55,1400,20090707,421 Bourke Street,Sidney,NSW,Australia,2010,0.99,1,3.96,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices02.json
233,44,55,1402,20090707,421 Bourke Street,Sidney,NSW,Australia,2010,0.99,1,3.96,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices02.json
234,44,55,1404,20090707,421 Bourke Street,Sidney,NSW,Australia,2010,0.99,1,3.96,2022-12-06T13:46:07.005+0000,dbfs:/FileStore/project/source_data/stream/invoices02.json


Gold Table: Performing aggregations

To demonstrate this is functioning properly, this query will return:

- Each Customer’s Last Name
- The total amount of tracks each customer has purchased
- The total amount of money each customer has spent

In [0]:
%sql
SELECT 
        c.LastName AS customer_lastName,
        SUM(i.quantity) AS total_quantity,
        ROUND(SUM(i.unit_price),2) AS total_spent
    FROM
        chinook_dw.fact_invoices_silver AS i
    LEFT OUTER JOIN chinook_dw.dim_customers AS c
        ON i.customer_key = c.CustomerId
    GROUP BY c.LastName
    ORDER BY total_spent DESC;

customer_lastName,total_quantity,total_spent
Holý,38,49.62
Cunningham,38,47.62
Rojas,38,46.62
O'Reilly,38,45.62
Kovács,38,45.62
Ralston,38,43.62
Zimmermann,38,43.62
Barnett,38,43.62
Gruber,38,42.62
Stevens,38,42.62


Another aggregation

In [0]:
%sql
SELECT 
    i.customer_key
  , c.LastName
  , c.FirstName
  , d.month_name
  , COUNT(i.track_key) AS TrackCount
FROM 
  chinook_dw.fact_invoices_silver AS i
LEFT OUTER JOIN chinook_dw.dim_customers AS c
  ON i.customer_key = c.CustomerId
LEFT OUTER JOIN chinook_dw.dim_date AS d
  ON i.invoice_date_key = d.date_key
GROUP BY customer_key, LastName, FirstName, month_name
ORDER BY TrackCount DESC

customer_key,LastName,FirstName,month_name,TrackCount
6,Holý,Helena,,16
44,Hämäläinen,Terhi,,16
17,Smith,Jack,December,14
56,Gutiérrez,Diego,March,14
54,Murray,Steve,October,14
38,Schröder,Niklas,October,14
4,Hansen,Bjørn,June,14
10,Martins,Eduardo,August,14
40,Lefebvre,Dominique,March,14
2,Köhler,Leonie,February,14
