## Capstone Project: Data Lakehouse with Structured Streaming
In this final project, I use the relational database management system Microsoft SQL Server, the NoSQL system MongoDB using JSON files, my local file system using CSV, and the massively parallel processing data integration systems Apache Spark and Databricks to successfully extract, transform, and load data.

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd 
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "scj5sa-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "Sakila_dw2"

connection_properties = {
  "user" : "scj5sa",
  "password" : "lm1gMuZT9n6cx5l2",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.exnmclm"
atlas_database_name = "Sakila_dw2"
atlas_user_name = "scj5sa"
atlas_password = "lm1gMuZT9n6cx5l2"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/capstone_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/movies"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

payments_stream_dir = f"{stream_dir}/payments"
rentals_stream_dir = f"{stream_dir}/rentals"

payments_output_bronze = f"{database_dir}/fact_payments/bronze"
payments_output_silver = f"{database_dir}/fact_payments/silver"
payments_output_gold   = f"{database_dir}/fact_payments/gold"

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_payments", True) 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[2]: True

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populating Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetching Reference Data From an Azure MySQL Database
##### 1.1. Creating a New Databricks Metadata Database

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Capstone"
LOCATION "dbfs:/FileStore/capstone_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone");

##### 1.2. Creating a New Table that Sources Date Dimension Data from a Table in my Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://scj5sa-mysql.mysql.database.azure.com:3306/Sakila_dw2", 
  dbtable "dim_date",
  user "scj5sa",   
  password "Sterls120$"  
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/capstone_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Creating a New Table that Sources Film Dimension Data from my Azure MySQL database.

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TEMPORARY VIEW view_film
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://scj5sa-mysql.mysql.database.azure.com:3306/Sakila_dw2", 
  dbtable "dim_films",
  user "scj5sa",    
  password "Sterls120$"  
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_film
COMMENT "Film Dimension Table"
LOCATION "dbfs:/FileStore/capstone_data/sakila_dlh/dim_film"
AS SELECT * FROM view_film

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film;

col_name,data_type,comment
film_key,int,
title,string,
description,string,
release_year,date,
rental_duration,int,
rental_rate,"decimal(4,2)",
length,int,
replacement_cost,"decimal(5,2)",
rating,string,
special_features,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5

film_key,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006-01-01,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15T05:03:42.000+0000
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006-01-01,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15T05:03:42.000+0000
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006-01-01,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15T05:03:42.000+0000
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006-01-01,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15T05:03:42.000+0000
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006-01-01,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15T05:03:42.000+0000


#### 2.0. Fetching Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/capstone_data/movies/batch'

path,name,size,modificationTime
dbfs:/FileStore/capstone_data/movies/batch/Sakila_DimCategoriesNew.csv,Sakila_DimCategoriesNew.csv,611,1701809273000
dbfs:/FileStore/capstone_data/movies/batch/Sakila_DimCustomers.json,Sakila_DimCustomers.json,118938,1701809273000
dbfs:/FileStore/capstone_data/movies/batch/Sakila_DimInventoriesNew.csv,Sakila_DimInventoriesNew.csv,158793,1701809274000
dbfs:/FileStore/capstone_data/movies/batch/Sakila_DimStores.json,Sakila_DimStores.json,183,1701809274000


##### 2.2. Creating a New MongoDB Database, and Loading JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/capstone_data/movies/batch'
json_files = {"customers" : 'Sakila_DimCustomers.json'
              , "stores" : 'Sakila_DimStores.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[15]: <pymongo.results.InsertManyResult at 0x7fe535be7140>

##### 2.3.1. Fetching Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "scj5sa"
val pwd = "lm1gMuZT9n6cx5l2"
val clusterName = "cluster0.exnmclm"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "Sakila_dw2")
.option("collection", "customers")
.option("uri", atlas_uri).load()
.select("customer_id","store_id","first_name","last_name","email","create_date","last_update")

display(df_customer)

customer_id,store_id,first_name,last_name,email,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20
6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20
7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20
8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20
9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20
10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20


In [0]:
%scala
df_customer.printSchema()

##### 2.3.2. Using the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_id,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
create_date,string,
last_update,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_id,store_id,first_name,last_name,email,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20


##### 2.4.1 Fetching Store Dimension Data from the New MongoDB Collection

In [0]:
%scala

val df_store = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "Sakila_dw2")
.option("collection", "stores")
.option("uri", atlas_uri).load()
.select("store_id","manager_staff_id","address_id","last_update")

display(df_store)

store_id,manager_staff_id,address_id,last_update
1,1,1,2006-02-15 04:57:12
2,2,2,2006-02-15 04:57:12


In [0]:
%scala
df_store.printSchema()

##### 2.4.2. Using the Spark DataFrame to Create a New Stores Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_store.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_store")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_store

col_name,data_type,comment
store_id,int,
manager_staff_id,int,
address_id,int,
last_update,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_store,
Created Time,Wed Dec 06 20:06:03 UTC 2023,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_store LIMIT 5

store_id,manager_staff_id,address_id,last_update
1,1,1,2006-02-15 04:57:12
2,2,2,2006-02-15 04:57:12


#### 3.0. Fetching Data from a File System
##### 3.1. Using PySpark to Read Category Dimension Data From a CSV File

In [0]:
category_csv = f"{batch_dir}/Sakila_DimCategoriesNew.csv"

df_category = spark.read.format('csv').options(header='true', inferSchema='true').load(category_csv)
display(df_category)

category_id,name,last_update
1,Action,2006-02-15T04:46:27.000+0000
2,Animation,2006-02-15T04:46:27.000+0000
3,Children,2006-02-15T04:46:27.000+0000
4,Classics,2006-02-15T04:46:27.000+0000
5,Comedy,2006-02-15T04:46:27.000+0000
6,Documentary,2006-02-15T04:46:27.000+0000
7,Drama,2006-02-15T04:46:27.000+0000
8,Family,2006-02-15T04:46:27.000+0000
9,Foreign,2006-02-15T04:46:27.000+0000
10,Games,2006-02-15T04:46:27.000+0000


In [0]:
df_category.printSchema()

root
 |-- category_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
df_category.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_category")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_category;

col_name,data_type,comment
category_id,int,
name,string,
last_update,timestamp,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_category,
Created Time,Wed Dec 06 20:07:02 UTC 2023,
Last Access,UNKNOWN,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_category LIMIT 5;

category_id,name,last_update
1,Action,2006-02-15T04:46:27.000+0000
2,Animation,2006-02-15T04:46:27.000+0000
3,Children,2006-02-15T04:46:27.000+0000
4,Classics,2006-02-15T04:46:27.000+0000
5,Comedy,2006-02-15T04:46:27.000+0000


##### 3.2 Using PySpark to Read Inventory Dimension Data from CSV File

In [0]:
inventory_csv = f"{batch_dir}/Sakila_DimInventoriesNew.csv"

df_inventory = spark.read.format('csv').options(header='true', inferSchema='true').load(inventory_csv)
display(df_inventory)

inventory_id,film_id,store_id,last_update
1,1,1,2006-02-15T05:09:17.000+0000
2,1,1,2006-02-15T05:09:17.000+0000
3,1,1,2006-02-15T05:09:17.000+0000
4,1,1,2006-02-15T05:09:17.000+0000
5,1,2,2006-02-15T05:09:17.000+0000
6,1,2,2006-02-15T05:09:17.000+0000
7,1,2,2006-02-15T05:09:17.000+0000
8,1,2,2006-02-15T05:09:17.000+0000
9,2,2,2006-02-15T05:09:17.000+0000
10,2,2,2006-02-15T05:09:17.000+0000


In [0]:
df_inventory.printSchema()

root
 |-- inventory_id: integer (nullable = true)
 |-- film_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory;

col_name,data_type,comment
inventory_id,int,
film_id,int,
store_id,int,
last_update,timestamp,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_inventory,
Created Time,Wed Dec 06 20:07:45 UTC 2023,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5;

inventory_id,film_id,store_id,last_update
1,1,1,2006-02-15T05:09:17.000+0000
2,1,1,2006-02-15T05:09:17.000+0000
3,1,1,2006-02-15T05:09:17.000+0000
4,1,1,2006-02-15T05:09:17.000+0000
5,1,2,2006-02-15T05:09:17.000+0000


##### Verifying Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_category,False
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_store,False
,view_date,True
,view_film,True


### Section III: Integrating Reference Data with Real-Time Data
#### 6.0. Using AutoLoader to Process Streaming (Hot Path) Payments Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "payment_id BIGINT")
 #.option("cloudFiles.schemaHints", "customer_id BIGINT")
 #.option("cloudFiles.schemaHints", "staff_id BIGINT")
 #.option("cloudFiles.schemaHints", "rental_id BIGINT") 
 #.option("cloudFiles.schemaHints", "amount DECIMAL")
 #.option("cloudFiles.schemaHints", "payment_date STRING")
 #.option("cloudFiles.schemaHints", "last_update STRING")
 .option("cloudFiles.schemaLocation", payments_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(payments_stream_dir)
 .createOrReplaceTempView("payments_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW payments_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM payments_raw_tempview
)

In [0]:
%sql
SELECT * FROM payments_bronze_tempview

amount,customer_id,payment_date_key,payment_id,rental_id,staff_id,_rescued_data,receipt_time,source_file
2.99,25,20050710,667,5881,1,,2023-12-06T20:09:18.621+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
4.99,25,20050712,668,6653,1,,2023-12-06T20:09:18.621+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
2.99,25,20050712,669,6905,2,,2023-12-06T20:09:18.621+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
2.99,25,20050729,670,8667,2,,2023-12-06T20:09:18.621+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
0.99,25,20050730,671,8878,2,,2023-12-06T20:09:18.621+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
8.99,25,20050730,672,9140,1,,2023-12-06T20:09:18.621+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
2.99,25,20050730,673,9334,2,,2023-12-06T20:09:18.621+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
2.99,25,20050731,674,9922,2,,2023-12-06T20:09:18.621+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
2.99,25,20050731,675,10103,2,,2023-12-06T20:09:18.621+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
5.99,25,20050801,676,10324,1,,2023-12-06T20:09:18.621+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json


In [0]:
(spark.table("payments_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{payments_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_payments_bronze"))

Out[34]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fe534b6d490>

##### 6.2. Silver Table: Including Reference Data

In [0]:
(spark.readStream
  .table("fact_payments_bronze")
  .createOrReplaceTempView("payments_silver_tempview"))

In [0]:
%sql
SELECT * FROM payments_silver_tempview

amount,customer_id,payment_date_key,payment_id,rental_id,staff_id,_rescued_data,receipt_time,source_file
2.99,25,20050710,667,5881,1,,2023-12-06T20:11:23.465+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
4.99,25,20050712,668,6653,1,,2023-12-06T20:11:23.465+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
2.99,25,20050712,669,6905,2,,2023-12-06T20:11:23.465+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
2.99,25,20050729,670,8667,2,,2023-12-06T20:11:23.465+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
0.99,25,20050730,671,8878,2,,2023-12-06T20:11:23.465+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
8.99,25,20050730,672,9140,1,,2023-12-06T20:11:23.465+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
2.99,25,20050730,673,9334,2,,2023-12-06T20:11:23.465+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
2.99,25,20050731,674,9922,2,,2023-12-06T20:11:23.465+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
2.99,25,20050731,675,10103,2,,2023-12-06T20:11:23.465+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json
5.99,25,20050801,676,10324,1,,2023-12-06T20:11:23.465+0000,dbfs:/FileStore/capstone_data/movies/stream/payments/sakila_payment3.json


In [0]:
%sql
DESCRIBE EXTENDED payments_silver_tempview

col_name,data_type,comment
amount,double,
customer_id,bigint,
payment_date_key,bigint,
payment_id,bigint,
rental_id,bigint,
staff_id,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql
-- Creating a new Temporary View named "fact_payments_silver_tempview" by selecting data from
-- "payments_silver_tempview" and joining it to the Customer, Store, and Date dimension tables.
-- Date dimension can serve as a "Role Playing" dimension by being Joined upon multiple times.

CREATE OR REPLACE TEMPORARY VIEW fact_payments_silver_tempview AS (
  SELECT p.payment_id,
      p.customer_id,
      c.store_id AS customer_store,
      c.first_name AS customer_first_name,
      c.last_name AS customer_last_name,
      c.email AS customer_email,
      c.create_date AS customer_create_date,
      c.last_update AS customer_last_update,
      p.staff_id,
      s.store_id AS store,
      s.address_id AS store_address,
      s.last_update AS store_last_update,
      p.rental_id,
      p.amount,
      p.payment_date_key,
      pd.day_name_of_week AS payment_day_name_of_week,
      pd.day_of_month AS payment_day_of_month,
      pd.weekday_weekend AS payment_weekday_weekend,
      pd.month_name AS payment_month_name,
      pd.calendar_quarter AS payment_calendar_quarter,
      pd.calendar_year AS payment_calendar_year
  FROM payments_silver_tempview AS p
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_id = p.customer_id
  INNER JOIN sakila_dlh.dim_store AS s
  ON s.manager_staff_id = p.staff_id
  LEFT OUTER JOIN sakila_dlh.dim_date AS pd
  ON pd.date_key = p.payment_date_key
)

In [0]:
%sql
SELECT * FROM fact_payments_silver_tempview LIMIT 5

payment_id,customer_id,customer_store,customer_first_name,customer_last_name,customer_email,customer_create_date,customer_last_update,staff_id,store,store_address,store_last_update,rental_id,amount,payment_date_key,payment_day_name_of_week,payment_day_of_month,payment_weekday_weekend,payment_month_name,payment_calendar_quarter,payment_calendar_year
32,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,1,1,1,2006-02-15 04:57:12,15315,5.99,20050822,Monday,22,Weekday,August,3,2005
59,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,2,2,2,2006-02-15 04:57:12,15907,4.99,20050823,Tuesday,23,Weekday,August,3,2005
85,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,1,1,1,2006-02-15 04:57:12,15619,2.99,20050823,Tuesday,23,Weekday,August,3,2005
107,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,2,2,2,2006-02-15 04:57:12,15635,1.99,20050823,Tuesday,23,Weekday,August,3,2005
145,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,2,2,2,2006-02-15 04:57:12,13209,0.99,20060214,Tuesday,14,Weekday,February,1,2006


In [0]:
(spark.table("fact_payments_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{payments_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_payments_silver"))

Out[40]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fe534b6da30>

In [0]:
%sql
SELECT * FROM fact_payments_silver

payment_id,customer_id,customer_store,customer_first_name,customer_last_name,customer_email,customer_create_date,customer_last_update,staff_id,store,store_address,store_last_update,rental_id,amount,payment_date_key,payment_day_name_of_week,payment_day_of_month,payment_weekday_weekend,payment_month_name,payment_calendar_quarter,payment_calendar_year
32,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,1,1,1,2006-02-15 04:57:12,15315,5.99,20050822,Monday,22,Weekday,August,3,2005
59,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,2,2,2,2006-02-15 04:57:12,15907,4.99,20050823,Tuesday,23,Weekday,August,3,2005
85,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,1,1,1,2006-02-15 04:57:12,15619,2.99,20050823,Tuesday,23,Weekday,August,3,2005
107,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,2,2,2,2006-02-15 04:57:12,15635,1.99,20050823,Tuesday,23,Weekday,August,3,2005
145,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,2,2,2,2006-02-15 04:57:12,13209,0.99,20060214,Tuesday,14,Weekday,February,1,2006
173,6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,2,2,2,2006-02-15 04:57:12,15603,0.99,20050823,Tuesday,23,Weekday,August,3,2005
206,7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,1,1,1,2006-02-15 04:57:12,14222,5.99,20050821,Sunday,21,Weekend,August,3,2005
230,8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,1,1,1,2006-02-15 04:57:12,15805,4.99,20050823,Tuesday,23,Weekday,August,3,2005
253,9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,1,1,1,2006-02-15 04:57:12,15813,4.99,20060214,Tuesday,14,Weekday,February,1,2006
278,10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,1,1,1,2006-02-15 04:57:12,15370,5.99,20050822,Monday,22,Weekday,August,3,2005


Databricks data profile. Run in Databricks to view.

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_payments_silver

col_name,data_type,comment
payment_id,bigint,
customer_id,bigint,
customer_store,int,
customer_first_name,string,
customer_last_name,string,
customer_email,string,
customer_create_date,string,
customer_last_update,string,
staff_id,bigint,
store,int,


##### 6.3. Gold Table: Perform Aggregations
Using the Gold table to perform aggregations on payments. The table includes the total number of payments by each customer, including information about which store, month, and year purchases were made made in.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_payments_by_customer_gold AS (
  SELECT customer_id AS Customer
    , customer_last_name AS LastName
    , customer_first_name AS FirstName
    , store AS StoreName
    , payment_month_name AS MonthName
    , payment_calendar_year AS YearName
    , COUNT(amount) AS TotalPayments
  FROM sakila_dlh.fact_payments_silver AS p
  GROUP BY Customer, LastName, FirstName, StoreName, MonthName, YearName
  ORDER BY TotalPayments DESC);

SELECT * FROM sakila_dlh.fact_payments_by_customer_gold;

Customer,LastName,FirstName,StoreName,MonthName,YearName,TotalPayments
35,GREEN,VIRGINIA,1,July,2005,11
26,HALL,JESSICA,1,July,2005,10
30,KING,MELISSA,2,July,2005,10
29,HERNANDEZ,ANGELA,1,August,2005,10
15,HARRIS,HELEN,2,August,2005,10
14,WHITE,BETTY,1,July,2005,9
30,KING,MELISSA,1,August,2005,9
5,BROWN,ELIZABETH,2,July,2005,9
6,DAVIS,JENNIFER,1,August,2005,9
13,JACKSON,KAREN,1,July,2005,9


Databricks visualization. Run in Databricks to view.

#### 7.0. Using AutoLoader to Process Streaming (Hot Path) Rentals Fact Data 
##### 7.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "rental_id BIGINT")
 #.option("cloudFiles.schemaHints", "rental_date STRING")
 #.option("cloudFiles.schemaHints", "inventory_id BIGINT")
 #.option("cloudFiles.schemaHints", "customer_id BIGINT") 
 #.option("cloudFiles.schemaHints", "return_date STRING")
 #.option("cloudFiles.schemaHints", "staff_id BIGINT")
 #.option("cloudFiles.schemaHints", "last_update STRING") 
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))


In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

customer_id,inventory_id,rental_date_key,rental_id,return_date_key,staff_id,_rescued_data,receipt_time,source_file
596,797,20050528,667,20050531,1,,2023-12-06T20:20:08.580+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
484,3528,20050528,668,20050529,1,,2023-12-06T20:20:08.580+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
313,3677,20050528,669,20050603,1,,2023-12-06T20:20:08.580+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
201,227,20050528,670,20050606,2,,2023-12-06T20:20:08.580+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
14,1027,20050528,671,20050603,2,,2023-12-06T20:20:08.580+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
306,697,20050528,672,20050606,2,,2023-12-06T20:20:08.580+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
468,1769,20050528,673,20050601,1,,2023-12-06T20:20:08.580+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
87,1150,20050528,674,20050601,2,,2023-12-06T20:20:08.580+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
338,1273,20050528,675,20050601,2,,2023-12-06T20:20:08.580+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
490,2329,20050528,676,20050529,2,,2023-12-06T20:20:08.580+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

Out[47]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fe534b7fc40>

##### 7.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

customer_id,inventory_id,rental_date_key,rental_id,return_date_key,staff_id,_rescued_data,receipt_time,source_file
596,797,20050528,667,20050531,1,,2023-12-06T20:20:31.035+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
484,3528,20050528,668,20050529,1,,2023-12-06T20:20:31.035+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
313,3677,20050528,669,20050603,1,,2023-12-06T20:20:31.035+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
201,227,20050528,670,20050606,2,,2023-12-06T20:20:31.035+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
14,1027,20050528,671,20050603,2,,2023-12-06T20:20:31.035+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
306,697,20050528,672,20050606,2,,2023-12-06T20:20:31.035+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
468,1769,20050528,673,20050601,1,,2023-12-06T20:20:31.035+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
87,1150,20050528,674,20050601,2,,2023-12-06T20:20:31.035+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
338,1273,20050528,675,20050601,2,,2023-12-06T20:20:31.035+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json
490,2329,20050528,676,20050529,2,,2023-12-06T20:20:31.035+0000,dbfs:/FileStore/capstone_data/movies/stream/rentals/sakila_rental3.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
customer_id,bigint,
inventory_id,bigint,
rental_date_key,bigint,
rental_id,bigint,
return_date_key,bigint,
staff_id,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql
-- Create a new Temporary View named "fact_rentals_silver_tempview" by selecting data from
-- "rentals_silver_tempview" and joining it to the Customer, Inventory, Store and Date dimension tables.
-- Date dimension can serve as a "Role Playing" dimension by being Joined upon multiple times.

CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT r.rental_id,
      r.customer_id,
      c.store_id AS customer_store,
      c.first_name AS customer_first_name,
      c.last_name AS customer_last_name,
      c.email AS customer_email,
      c.create_date AS customer_create_date,
      c.last_update AS customer_last_update,
      r.inventory_id,
      i.film_id AS film,
      i.store_id AS stores,
      i.last_update AS inventory_last_update,
      r.staff_id,
      s.store_id AS store,
      s.address_id AS store_address,
      s.last_update AS store_last_update,
      r.rental_date_key,
      rd.day_name_of_week AS rental_day_name_of_week,
      rd.day_of_month AS rental_day_of_month,
      rd.weekday_weekend AS rental_weekday_weekend,
      rd.month_name AS rental_month_name,
      rd.calendar_quarter AS rental_calendar_quarter,
      rd.calendar_year AS rental_calendar_year,
      r.return_date_key,
      rtd.day_name_of_week AS return_day_name_of_week,
      rtd.day_of_month AS return_day_of_month,
      rtd.weekday_weekend AS return_weekday_weekend,
      rtd.month_name AS return_month_name,
      rtd.calendar_quarter AS return_calendar_quarter,
      rtd.calendar_year AS return_calendar_year
  FROM rentals_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_id = r.customer_id
  INNER JOIN sakila_dlh.dim_store AS s
  ON s.manager_staff_id = r.staff_id
  INNER JOIN sakila_dlh.dim_inventory AS i
  ON i.inventory_id = r.inventory_id
  LEFT OUTER JOIN sakila_dlh.dim_date AS rd
  ON rd.date_key = r.rental_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS rtd
  ON rtd.date_key = r.return_date_key
)

In [0]:
%sql
SELECT * FROM fact_rentals_silver_tempview LIMIT 5

rental_id,customer_id,customer_store,customer_first_name,customer_last_name,customer_email,customer_create_date,customer_last_update,inventory_id,film,stores,inventory_last_update,staff_id,store,store_address,store_last_update,rental_date_key,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_calendar_quarter,rental_calendar_year,return_date_key,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_calendar_quarter,return_calendar_year
76,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,3021,663,2,2006-02-15T05:09:17.000+0000,2,2,2,2006-02-15 04:57:12,20050525,Wednesday,25,Weekday,May,2,2005,20050603,Friday,3,Weekday,June,2,2005
320,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,1090,243,1,2006-02-15T05:09:17.000+0000,2,2,2,2006-02-15 04:57:12,20050527,Friday,27,Weekday,May,2,2005,20050528,Saturday,28,Weekend,May,2,2005
435,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,3328,732,1,2006-02-15T05:09:17.000+0000,2,2,2,2006-02-15 04:57:12,20050527,Friday,27,Weekday,May,2,2005,20050602,Thursday,2,Weekday,June,2,2005
731,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,4124,896,2,2006-02-15T05:09:17.000+0000,1,1,1,2006-02-15 04:57:12,20050529,Sunday,29,Weekend,May,2,2005,20050530,Monday,30,Weekday,May,2,2005
57,6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,3938,858,2,2006-02-15T05:09:17.000+0000,2,2,2,2006-02-15 04:57:12,20050525,Wednesday,25,Weekday,May,2,2005,20050529,Sunday,29,Weekend,May,2,2005


In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

Out[53]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fe534af9bb0>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

rental_id,customer_id,customer_store,customer_first_name,customer_last_name,customer_email,customer_create_date,customer_last_update,inventory_id,film,stores,inventory_last_update,staff_id,store,store_address,store_last_update,rental_date_key,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_calendar_quarter,rental_calendar_year,return_date_key,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_calendar_quarter,return_calendar_year
76,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,3021,663,2,2006-02-15T05:09:17.000+0000,2,2,2,2006-02-15 04:57:12,20050525,Wednesday,25,Weekday,May,2,2005,20050603,Friday,3,Weekday,June,2,2005
320,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,1090,243,1,2006-02-15T05:09:17.000+0000,2,2,2,2006-02-15 04:57:12,20050527,Friday,27,Weekday,May,2,2005,20050528,Saturday,28,Weekend,May,2,2005
435,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,3328,732,1,2006-02-15T05:09:17.000+0000,2,2,2,2006-02-15 04:57:12,20050527,Friday,27,Weekday,May,2,2005,20050602,Thursday,2,Weekday,June,2,2005
731,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,4124,896,2,2006-02-15T05:09:17.000+0000,1,1,1,2006-02-15 04:57:12,20050529,Sunday,29,Weekend,May,2,2005,20050530,Monday,30,Weekday,May,2,2005
57,6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,3938,858,2,2006-02-15T05:09:17.000+0000,2,2,2,2006-02-15 04:57:12,20050525,Wednesday,25,Weekday,May,2,2005,20050529,Sunday,29,Weekend,May,2,2005
117,7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,4278,931,2,2006-02-15T05:09:17.000+0000,2,2,2,2006-02-15 04:57:12,20050525,Wednesday,25,Weekday,May,2,2005,20050531,Tuesday,31,Weekday,May,2,2005
866,8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,2867,628,2,2006-02-15T05:09:17.000+0000,1,1,1,2006-02-15 04:57:12,20050530,Monday,30,Weekday,May,2,2005,20050608,Wednesday,8,Weekday,June,2,2005
350,9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,2756,604,2,2006-02-15T05:09:17.000+0000,2,2,2,2006-02-15 04:57:12,20050527,Friday,27,Weekday,May,2,2005,20050604,Saturday,4,Weekend,June,2,2005
987,11,2,LISA,ANDERSON,LISA.ANDERSON@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,390,86,1,2006-02-15T05:09:17.000+0000,1,1,1,2006-02-15 04:57:12,20050530,Monday,30,Weekday,May,2,2005,20050607,Tuesday,7,Weekday,June,2,2005
988,12,1,NANCY,THOMAS,NANCY.THOMAS@sakilacustomer.org,2006-02-14 22:04:36,2006-02-15 04:57:20,1364,301,1,2006-02-15T05:09:17.000+0000,1,1,1,2006-02-15 04:57:12,20050530,Monday,30,Weekday,May,2,2005,20050607,Tuesday,7,Weekday,June,2,2005


In [0]:
%sql
DESCRIBE EXTENDED fact_rentals_silver

col_name,data_type,comment
rental_id,bigint,
customer_id,bigint,
customer_store,int,
customer_first_name,string,
customer_last_name,string,
customer_email,string,
customer_create_date,string,
customer_last_update,string,
inventory_id,bigint,
film,int,


##### 7.3. Gold Table: Perform Aggregations
Using the Gold table to perform aggregations on rentals. The table includes information on each film including inventory levels, key dates, and the customer in possession.

In [0]:
%sql

CREATE OR REPLACE TABLE sakila_dlh.fact_film_rentals_gold AS (
  SELECT r.customer_last_name AS LastName
    , r.rental_id AS Rental
    , r.rental_month_name AS RentalMonth
    , r.rental_calendar_year AS RentalYear
    , r.return_month_name AS ReturnMonth
    , r.return_calendar_year AS ReturnYear
    , f.title AS FilmName
    , COUNT (i.inventory_id) AS FilmLevel
  FROM sakila_dlh.fact_rentals_silver AS r
  LEFT OUTER JOIN sakila_dlh.dim_film AS f
  ON r.film = f.film_key
  LEFT OUTER JOIN sakila_dlh.dim_inventory AS i
  ON r.inventory_id = i.inventory_id
  GROUP BY LastName, FilmName, Rental, RentalMonth, RentalYear, ReturnMonth, ReturnYear
  ORDER BY FilmLevel);

SELECT * FROM sakila_dlh.fact_film_rentals_gold;

LastName,Rental,RentalMonth,RentalYear,ReturnMonth,ReturnYear,FilmName,FilmLevel
SHELLEY,967,May,2005,June,2005,HOME PITY,1
BRINSON,847,May,2005,June,2005,WARDROBE PHANTOM,1
BOWENS,461,May,2005,June,2005,IDOLS SNATCHERS,1
BAKER,923,May,2005,June,2005,HEAVEN FREEDOM,1
COUGHLIN,863,May,2005,June,2005,SWEDEN SHINING,1
RHOADS,713,May,2005,June,2005,SINNERS ATLANTIS,1
LAWTON,368,May,2005,June,2005,EGYPT TENENBAUMS,1
MORRELL,411,May,2005,June,2005,FIGHT JAWBREAKER,1
HARDISON,95,May,2005,June,2005,ARMAGEDDON LOST,1
ADAM,470,May,2005,June,2005,GATHERING CALENDAR,1


Databricks visualization. Run in Databricks to view.

#### 8.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/capstone_data/