# **DS-2002 – Data Project 2 (Course Capstone)**
# 
25 points

The goal of the second data project, building upon the first project, is to further demonstrate (1) an
understanding of and (2) competence creating and implementing basic data science systems such as
pipelines, scripts, data transformations, APIs, databases and cloud services. Submit your project in your
GitHub Repo or file drop on Collab.

Data Projects must be done individually.

**Putting it All Together: Data Integration & Analysis**

**Deliverable:** Using Azure Databricks, design and populate a dimensional Data Lakehouse that represents
a simple business process of your choosing. Examples might include retail sales, inventory, procurement,
order management, transportation or hospitality bookings, medical appointments, student registration
and/or attendance. You may select any business process that interests you, but remember that a
dimensional Data Lakehouse provides for the post hoc summarization and historic analysis of business
transactions that reflect the interaction between various entities (e.g., patients & doctors, retailers &
customers, students & schools/classes, travelers & airlines/hotels).
The most straight-forward approach is to identify an existing OLTP example database wherein all
required data relationships already exist; however, you may choose to populate your Data Lakehouse
using data from multiple sources as long as you can successfully use their business keys (e.g., customer
code, product code) to establish the appropriate relationships between the Fact and Dimension tables.
Your project should demonstrate your understanding of the differing types of relational data systems
(OLTP/OLAP), and how data can be **extracted** from various source systems (structured, semi-structured,
unstructured), **transformed** (cleansed, integrated), and then **loaded** into a destination system that’s
optimized for post hoc diagnostic analysis. Your project should also demonstrate your knowledge of data
integration patterns like ETL, ELT and ELTL, and architectures (e.g., lambda or kappa) for integrating batch
and real-time (streaming) data sources.

**Design Requirements:**

Your solution (database schema) needn’t be complex, but should meet the following requirements:

• A **Date dimension** to enable the analysis of the business process over various intervals of time
(the code for creating this in MySQL and Microsoft SQL Server has already been provided for you).

• At least 3 additional dimension tables (e.g., customers, employees, products)

• At least 1 fact table that models the business process (e.g., sales, reservations, bookings)

• Your solution must populate its dimensions using data originating from multiple sources:

o A relational database like Azure MySQL, or Azure SQL Server

o A NoSQL database like MongoDB Atlas, or Azure Cosmos DB

o Files (e.g., CSV) from a cloud-based file system; like the Databricks File System (DBFS)

• Your solution must integrate datum of differing granularity (i.e., static and near real-time)

• Your solution must include results that demonstrate the business value of your solution. For
example, a query (SELECT statement) that summarizes transaction details by customer, product, etc.


**Functional Requirements:**

1. Your solution must demonstrate at least one batch execution (i.e., use sample source data [SQL,
NoSQL and file system] to demonstrate loading at least one incremental data load).
2. Your solution must demonstrate accumulating data that originates from a real-time (streaming)
data source for a predetermined interval (mini-batch), integrating it with reference data, and
then using the product as a source for populating your dimensional Data Lakehouse. (i.e.,
implement the Databricks bronze, silver, gold architecture).

a. Use the Spark AutoLoader to demonstrate integrating streaming data (using separate
JSON files) for at least 3 intervals. This is most easily accomplished by segmenting the
Fact table source data into 3 ranges and exporting them into 3 separate JSON files.

b. Illustrate the relationships between the “real-time” fact data and the static reference
data. This is accomplished by joining fact and dimension tables at the Silver table phase.

3. Use a Databricks Notebook to execute all data integration, object creation and query execution.
4. Please submit all code, and other artifacts, in a GitHub repository in your account.
Note: You may utilize any combination of Cloud service technologies. For example, you can collect
streaming data from a source API on the Internet, integrate it with reference data that’s stored in
another Cloud hosted database service (e.g., Mongo DB Atlas) using Databricks, and then load it into
your dimensional Data Lakehouse that’s hosted in an Azure MySQL or Azure SQL database.

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "wux8esmysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_2"

connection_properties = {
  "user" : "wux8es",
  "password" : "wakeforestcomputer1!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.qw666"
atlas_database_name = "sakila_2"
atlas_user_name = "wux8es"
atlas_password = "BO9JpZ3pjcBYM2nR"

# Data Files (JSON) Information ###############################
dst_database = "sakila_2"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/rentals"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

fact_rentals_dir = f"{stream_dir}/fact_rentals"

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"


# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

In [0]:
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
uri = "mongodb+srv://hdswartz10:Atlas2023@cluster0.qw666.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))
# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_2 CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_2
COMMENT "DS-2002 Capstone Database"
LOCATION "dbfs:/FileStore/lab_data/sakila_2"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wux8esmysql.mysql.database.azure.com:3306/sakila_2?useSSL=true",
  dbtable "dim_date",
  user "wux8es",   
  password "wakeforestcomputer1!" 
)

In [0]:
%sql
USE DATABASE sakila_2;

CREATE OR REPLACE TABLE sakila_2.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_2/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_2.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_2.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20050101,2005-01-01,2005/01/01,01/01/2005,01/01/2005,7,Saturday,1,1,Weekend,53,January,1,N,1,2005,2005-01,2005Q1,7,3,2005,2005-07,2005Q3
20050102,2005-01-02,2005/01/02,01/02/2005,02/01/2005,1,Sunday,2,2,Weekend,53,January,1,N,1,2005,2005-01,2005Q1,7,3,2005,2005-07,2005Q3
20050103,2005-01-03,2005/01/03,01/03/2005,03/01/2005,2,Monday,3,3,Weekday,1,January,1,N,1,2005,2005-01,2005Q1,7,3,2005,2005-07,2005Q3
20050104,2005-01-04,2005/01/04,01/04/2005,04/01/2005,3,Tuesday,4,4,Weekday,1,January,1,N,1,2005,2005-01,2005Q1,7,3,2005,2005-07,2005Q3
20050105,2005-01-05,2005/01/05,01/05/2005,05/01/2005,4,Wednesday,5,5,Weekday,1,January,1,N,1,2005,2005-01,2005Q1,7,3,2005,2005-07,2005Q3


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_film
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wux8esmysql.mysql.database.azure.com:3306/sakila_2?useSSL=true", 
  dbtable "film",
  user "wux8es",
  password "wakeforestcomputer1!"
)

In [0]:
%sql
USE DATABASE sakila_2;

CREATE OR REPLACE TABLE sakila_2.view_film
COMMENT "Film Details"
LOCATION "dbfs:/FileStore/lab_data/sakila_2/view_film"
AS SELECT * FROM view_film


num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_2.view_film;

col_name,data_type,comment
film_id,int,
title,varchar(128),
description,varchar(65535),
release_year,date,
language_id,tinyint,
original_language_id,tinyint,
rental_duration,tinyint,
rental_rate,"decimal(4,2)",
length,int,
replacement_cost,"decimal(5,2)",


In [0]:
%sql
SELECT * FROM sakila_2.view_film LIMIT 5

film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006-01-01,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15T10:03:42Z
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006-01-01,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15T10:03:42Z
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006-01-01,1,,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15T10:03:42Z
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006-01-01,1,,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15T10:03:42Z
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006-01-01,1,,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15T10:03:42Z


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data/sakila_2'

path,name,size,modificationTime
dbfs:/FileStore/lab_data/rentals/batch/sakila_2_actor.csv,sakila_2_actor.csv,8649,1733590497000
dbfs:/FileStore/lab_data/rentals/batch/sakila_2_film_data.json,sakila_2_film_data.json,229685,1733590497000
dbfs:/FileStore/lab_data/rentals/batch/sakila_2_inventory_data.json,sakila_2_inventory_data.json,241198,1733590654000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/lab_data/rentals/batch'
json_files = {"film_data" : 'sakila_2_film_data.json'
              , "inventory_data" : 'sakila_2_inventory_data.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

InsertManyResult([ObjectId('67547fc14c51e311c8fcf6c7'), ObjectId('67547fc14c51e311c8fcf6c8'), ObjectId('67547fc14c51e311c8fcf6c9'), ObjectId('67547fc14c51e311c8fcf6ca'), ObjectId('67547fc14c51e311c8fcf6cb'), ObjectId('67547fc14c51e311c8fcf6cc'), ObjectId('67547fc14c51e311c8fcf6cd'), ObjectId('67547fc14c51e311c8fcf6ce'), ObjectId('67547fc14c51e311c8fcf6cf'), ObjectId('67547fc14c51e311c8fcf6d0'), ObjectId('67547fc14c51e311c8fcf6d1'), ObjectId('67547fc14c51e311c8fcf6d2'), ObjectId('67547fc14c51e311c8fcf6d3'), ObjectId('67547fc14c51e311c8fcf6d4'), ObjectId('67547fc14c51e311c8fcf6d5'), ObjectId('67547fc14c51e311c8fcf6d6'), ObjectId('67547fc14c51e311c8fcf6d7'), ObjectId('67547fc14c51e311c8fcf6d8'), ObjectId('67547fc14c51e311c8fcf6d9'), ObjectId('67547fc14c51e311c8fcf6da'), ObjectId('67547fc14c51e311c8fcf6db'), ObjectId('67547fc14c51e311c8fcf6dc'), ObjectId('67547fc14c51e311c8fcf6dd'), ObjectId('67547fc14c51e311c8fcf6de'), ObjectId('67547fc14c51e311c8fcf6df'), ObjectId('67547fc14c51e311c8fcf6

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "wux8es"
val pwd = "BO9JpZ3pjcBYM2nR"
val clusterName = "cluster0.qw666"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_film_data = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_2")
.option("collection", "film_data").load()
.select("film_id","title","release_year","language_id","rental_duration","rental_rate","length","replacement_cost","rating","special_features")

display(df_film_data)

film_id,title,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
1,ACADEMY DINOSAUR,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
2,ACE GOLDFINGER,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
3,ADAPTATION HOLES,2006,1,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
4,AFFAIR PREJUDICE,2006,1,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
5,AFRICAN EGG,2006,1,6,2.99,130,22.99,G,Deleted Scenes
6,AGENT TRUMAN,2006,1,3,2.99,169,17.99,PG,Deleted Scenes
7,AIRPLANE SIERRA,2006,1,6,4.99,62,28.99,PG-13,"Trailers,Deleted Scenes"
8,AIRPORT POLLOCK,2006,1,6,4.99,54,15.99,R,Trailers
9,ALABAMA DEVIL,2006,1,3,2.99,114,21.99,PG-13,"Trailers,Deleted Scenes"
10,ALADDIN CALENDAR,2006,1,6,4.99,63,24.99,NC-17,"Trailers,Deleted Scenes"


In [0]:
%scala
df_film_data.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Film Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_film_data.write.format("delta").mode("overwrite").saveAsTable("sakila_2.film_data")

In [0]:
%sql
DESCRIBE EXTENDED sakila_2.film_data

col_name,data_type,comment
film_id,int,
title,string,
release_year,int,
language_id,int,
rental_duration,int,
rental_rate,double,
length,int,
replacement_cost,double,
rating,string,
special_features,string,


In [0]:
%sql
SELECT * FROM sakila_2.film_data LIMIT 5

film_id,title,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
1,ACADEMY DINOSAUR,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
2,ACE GOLDFINGER,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
3,ADAPTATION HOLES,2006,1,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
4,AFFAIR PREJUDICE,2006,1,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
5,AFRICAN EGG,2006,1,6,2.99,130,22.99,G,Deleted Scenes


##### 2.4.1 Fetch Inventory Data from the New MongoDB Collection

In [0]:
%scala

val df_inventory_data = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_2")
.option("collection", "inventory_data").load()
.select("inventory_id", "film_id","store_id")

display(df_inventory_data)


inventory_id,film_id,store_id
1,1,1
2,1,1
3,1,1
4,1,1
5,1,2
6,1,2
7,1,2
8,1,2
9,2,2
10,2,2


In [0]:
%scala
df_inventory_data.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Inventory Dimension Table in the Databricks Metadata Database (sakila_2)

In [0]:

%scala
df_inventory_data.write.format("delta").mode("overwrite").saveAsTable("sakila_2.dim_inventory_data")

In [0]:
%sql
DESCRIBE EXTENDED sakila_2.dim_inventory_data

col_name,data_type,comment
inventory_id,int,
film_id,int,
store_id,int,
,,
# Delta Statistics Columns,,
Column Names,"inventory_id, film_id, store_id",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_2.dim_inventory_data LIMIT 5

inventory_id,film_id,store_id
1,1,1
2,1,1
3,1,1
4,1,1
5,1,2


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
customer_csv = f"{batch_dir}/sakila_2_customer.csv"

df_customer = spark.read.format('csv').options(header='true', inferSchema='true').load(customer_csv)
display(df_customer)

customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z


In [0]:
df_customer.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_2.df_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_2.df_customer;

col_name,data_type,comment
customer_id,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
address_id,int,
active,int,
create_date,timestamp,
last_update,timestamp,
,,


In [0]:
%sql
SELECT * FROM sakila_2.df_customer LIMIT 5;

customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_2;
SHOW TABLES

database,tableName,isTemporary
sakila_2,df_actor,False
sakila_2,df_customer,False
sakila_2,dim_date,False
sakila_2,dim_inventory_data,False
sakila_2,fact_film_rentals_by_customer_gold,False
sakila_2,fact_monthly_rentals_by_customer_gold,False
sakila_2,fact_rentals_bronze,False
sakila_2,fact_rentals_silver,False
sakila_2,film,False
sakila_2,film_data,False


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "rental_id BIGINT")
 #.option("cloudFiles.schemaHints", "amount DECIMAL(10,2)")
 #.option("cloudFiles.schemaHints", "payment_id BIGINT")
 #.option("cloudFiles.schemaHints", "customer_id BIGINT") 
 #.option("cloudFiles.schemaHints", "staff_id BIGINT")
 #.option("cloudFiles.schemaHints", "inventory_id BIGINT")
 #.option("cloudFiles.schemaHints", "rental_date_key INT")
 #.option("cloudFiles.schemaHints", "return_date_key INT")

 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(fact_rentals_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

amount,customer_id,inventory_id,payment_id,rental_date_key,rental_id,return_date_key,staff_id,_rescued_data,receipt_time,source_file
4.99,120,47,3243,20050801,10696,20050804,2,,2024-12-07T21:15:31.008Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals3.json
4.99,519,3115,13982,20050801,10697,20050807,2,,2024-12-07T21:15:31.008Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals3.json
3.99,135,2738,3657,20050801,10698,20050808,1,,2024-12-07T21:15:31.008Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals3.json
2.99,125,1029,3376,20050801,10699,20050806,1,,2024-12-07T21:15:31.008Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals3.json
3.99,203,4259,5508,20050801,10700,20050807,2,,2024-12-07T21:15:31.008Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals3.json
3.99,538,3958,14498,20050801,10701,20050809,2,,2024-12-07T21:15:31.008Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals3.json
4.99,560,2802,15009,20050801,10702,20050809,1,,2024-12-07T21:15:31.008Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals3.json
2.99,181,1818,4933,20050801,10703,20050807,2,,2024-12-07T21:15:31.008Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals3.json
8.99,594,960,15923,20050801,10704,20050808,2,,2024-12-07T21:15:31.008Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals3.json
0.99,381,4338,10330,20050801,10705,20050804,2,,2024-12-07T21:15:31.008Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals3.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f8310672ad0>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

amount,customer_id,inventory_id,payment_id,rental_date_key,rental_id,return_date_key,staff_id,_rescued_data,receipt_time,source_file
2.99,130,367,3504,20050524,1,20050526,1,,2024-12-07T18:59:54.824Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals1.json
2.99,459,1525,12377,20050524,2,20050528,2,,2024-12-07T18:59:54.824Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals1.json
3.99,408,1711,11032,20050524,3,20050601,2,,2024-12-07T18:59:54.824Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals1.json
4.99,333,2452,8987,20050524,4,20050603,1,,2024-12-07T18:59:54.824Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals1.json
6.99,222,2079,6003,20050524,5,20050602,1,,2024-12-07T18:59:54.824Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals1.json
0.99,549,2792,14728,20050524,6,20050527,1,,2024-12-07T18:59:54.824Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals1.json
1.99,269,3995,7274,20050524,7,20050529,2,,2024-12-07T18:59:54.824Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals1.json
4.99,239,2346,6440,20050524,8,20050527,2,,2024-12-07T18:59:54.824Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals1.json
4.99,126,2580,3386,20050525,9,20050528,1,,2024-12-07T18:59:54.824Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals1.json
5.99,399,1824,10785,20050525,10,20050531,2,,2024-12-07T18:59:54.824Z,dbfs:/FileStore/lab_data/rentals/stream/fact_rentals/Fact_rentals1.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
amount,double,
customer_id,bigint,
inventory_id,bigint,
payment_id,bigint,
rental_date_key,bigint,
rental_id,bigint,
return_date_key,bigint,
staff_id,bigint,
_rescued_data,string,
receipt_time,timestamp,


In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT 
    fr.rental_id,
    fr.inventory_id,
    inv.film_id,
    fd.title AS film_title,
    fd.release_year AS film_release_year,
    fd.rental_duration AS film_rental_duration,
    fd.rental_rate AS film_rental_rate,
    fd.length AS film_length,
    fd.replacement_cost AS film_replacement_cost,
    fd.rating AS film_rating,
    fd.special_features AS film_special_features,
    fr.customer_id,
    c.first_name AS customer_first_name,
    c.last_name AS customer_last_name,
    c.email AS customer_email,
    c.active AS customer_active,
    fr.staff_id,
    fr.amount,
    fr.payment_id,
    fr.rental_date_key,
    d_rental.full_date AS rental_date,
    d_rental.day_name_of_week AS rental_day_name_of_week,
    d_rental.weekday_weekend AS rental_weekday_weekend,
    d_rental.month_name AS rental_month_name,
    d_rental.calendar_quarter AS rental_quarter,
    d_rental.calendar_year AS rental_year,
    fr.return_date_key,
    d_return.full_date AS return_date,
    d_return.day_name_of_week AS return_day_name_of_week,
    d_return.weekday_weekend AS return_weekday_weekend,
    d_return.month_name AS return_month_name,
    d_return.calendar_quarter AS return_quarter,
    d_return.calendar_year AS return_year

  FROM rentals_silver_tempview AS fr
  INNER JOIN sakila_2.dim_inventory_data AS inv
    ON fr.inventory_id = inv.inventory_id
  INNER JOIN sakila_2.film_data AS fd
    ON inv.film_id = fd.film_id
  INNER JOIN sakila_2.df_customer AS c
    ON fr.customer_id = c.customer_id
  LEFT OUTER JOIN sakila_2.dim_date AS d_rental
    ON fr.rental_date_key = d_rental.date_key
  LEFT OUTER JOIN sakila_2.dim_date AS d_return
    ON fr.return_date_key = d_return.date_key
);



In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f8310f357d0>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

rental_id,inventory_id,film_id,film_title,film_release_year,film_rental_duration,film_rental_rate,film_length,film_replacement_cost,film_rating,film_special_features,customer_id,staff_id,amount,payment_id,rental_date_key,rental_date,rental_day_name_of_week,rental_weekday_weekend,rental_month_name,rental_quarter,rental_year,return_date_key,return_date,return_day_name_of_week,return_weekday_weekend,return_month_name,return_quarter,return_year
1,367,80,BLANKET BEVERLY,2006,7,2.99,148,21.99,G,Trailers,130,1,2.99,3504,20050524,2005-05-24,Tuesday,Weekday,May,2,2005,20050526,2005-05-26,Thursday,Weekday,May,2.0,2005.0
2,1525,333,FREAKY POCUS,2006,7,2.99,126,16.99,R,"Trailers,Behind the Scenes",459,2,2.99,12377,20050524,2005-05-24,Tuesday,Weekday,May,2,2005,20050528,2005-05-28,Saturday,Weekend,May,2.0,2005.0
3,1711,373,GRADUATE LORD,2006,7,2.99,156,14.99,G,"Trailers,Behind the Scenes",408,2,3.99,11032,20050524,2005-05-24,Tuesday,Weekday,May,2,2005,20050601,2005-06-01,Wednesday,Weekday,June,2.0,2005.0
4,2452,535,LOVE SUICIDES,2006,6,0.99,181,21.99,R,"Trailers,Behind the Scenes",333,1,4.99,8987,20050524,2005-05-24,Tuesday,Weekday,May,2,2005,20050603,2005-06-03,Friday,Weekday,June,2.0,2005.0
5,2079,450,IDOLS SNATCHERS,2006,5,2.99,84,29.99,NC-17,Trailers,222,1,6.99,6003,20050524,2005-05-24,Tuesday,Weekday,May,2,2005,20050602,2005-06-02,Thursday,Weekday,June,2.0,2005.0
6,2792,613,MYSTIC TRUMAN,2006,5,0.99,92,19.99,NC-17,"Commentaries,Behind the Scenes",549,1,0.99,14728,20050524,2005-05-24,Tuesday,Weekday,May,2,2005,20050527,2005-05-27,Friday,Weekday,May,2.0,2005.0
7,3995,870,SWARM GOLD,2006,4,0.99,123,12.99,PG-13,"Trailers,Commentaries",269,2,1.99,7274,20050524,2005-05-24,Tuesday,Weekday,May,2,2005,20050529,2005-05-29,Sunday,Weekend,May,2.0,2005.0
8,2346,510,LAWLESS VISION,2006,6,4.99,181,29.99,G,"Deleted Scenes,Behind the Scenes",239,2,4.99,6440,20050524,2005-05-24,Tuesday,Weekday,May,2,2005,20050527,2005-05-27,Friday,Weekday,May,2.0,2005.0
9,2580,565,MATRIX SNOWMAN,2006,6,4.99,56,9.99,PG-13,"Commentaries,Deleted Scenes,Behind the Scenes",126,1,4.99,3386,20050525,2005-05-25,Wednesday,Weekday,May,2,2005,20050528,2005-05-28,Saturday,Weekend,May,2.0,2005.0
10,1824,396,HANGING DEEP,2006,5,4.99,62,18.99,G,"Trailers,Commentaries,Deleted Scenes",399,2,5.99,10785,20050525,2005-05-25,Wednesday,Weekday,May,2,2005,20050531,2005-05-31,Tuesday,Weekday,May,2.0,2005.0


In [0]:
%sql
DESCRIBE EXTENDED sakila_2.fact_rentals_silver

col_name,data_type,comment
rental_id,bigint,
inventory_id,bigint,
film_id,int,
film_title,string,
film_release_year,int,
film_rental_duration,int,
film_rental_rate,double,
film_length,int,
film_replacement_cost,double,
film_rating,string,


##### 6.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach. The table should include the number of products sold per customer each Month, along with the Customers' ID, First & Last Name, and the Month in which the order was placed.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_2.fact_monthly_rentals_by_customer_gold AS (
  SELECT
    customer_id AS CustomerID,
    rental_month_name AS RentalMonth,
    COUNT(film_id) AS FilmCount
  FROM sakila_2.fact_rentals_silver
  GROUP BY CustomerID, RentalMonth
  ORDER BY FilmCount DESC
);

SELECT * FROM sakila_2.fact_monthly_rentals_by_customer_gold;


CustomerID,RentalMonth,FilmCount
197,May,8
197,August,7
19,May,6
245,May,6
371,May,6
274,May,6
596,May,6
502,August,6
407,August,6
418,August,6


In [0]:
%sql
CREATE OR REPLACE TABLE sakila_2.fact_film_rentals_by_customer_gold AS (
  SELECT
    frs.customer_id AS CustomerID,
    frs.film_id AS FilmID,
    fd.title AS FilmTitle,
    pc.FilmCount
  FROM sakila_2.fact_rentals_silver AS frs
  INNER JOIN (
    SELECT 
      customer_id AS CustomerID,
      COUNT(film_id) AS FilmCount
    FROM sakila_2.fact_rentals_silver
    GROUP BY customer_id
  ) AS pc ON pc.CustomerID = frs.customer_id
  INNER JOIN sakila_2.film_data AS fd
    ON frs.film_id = fd.film_id
  ORDER BY FilmCount DESC
);

SELECT * FROM sakila_2.fact_film_rentals_by_customer_gold;


CustomerID,FilmID,FilmTitle,FilmCount
197,178,CONNECTION MICROCOSMOS,15
197,529,LONELY ELEPHANT,15
197,761,SANTA PARIS,15
197,443,HURRICANE AFFAIR,15
197,387,GUN BONNIE,15
197,474,JADE BUNCH,15
197,149,CHRISTMAS MOONSHINE,15
197,366,GOLDFINGER SENSIBILITY,15
197,303,FANTASY TROOPERS,15
197,857,STRICTLY SCARFACE,15


#### 9.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/lab_data/