## Mina Gorani (CAP9ET) DS2002 Capstone Fall 2023

Payment, customer, and staff dimension tables from my midterm along with the fact table were loaded into this script to mimic a system with hot and cold path data. The staff dimension was grabbed from a connection with Azure MySQL, the customer dimension was placed and grabbed from MongoDB, and the payment dimension was grabbed from a CSV placed into the DBFS from my PC. 

The fact table from my midterm midterm was exported from MySQL into four smaller files and uploaded to the DBFS as well. 

All files uploaded to the DBFS are located in the capstone_data folder on github.

### Section I: Prerequisites

#### 1.0 Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ################### 

jdbc_hostname = "cap9et-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dm"

#use azure login info, not mysql info 

connection_properties = {
  "user" : "cap9et",
  "password" : "Ch1pmunk!123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "DS2002Cluster.m09nd7q"
atlas_database_name = "sakila_dm"
atlas_user_name = "cap9et"
atlas_password = "Ch1pmunk!123"


# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/capstone_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/sakila_retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rental_payment_stream_dir = f"{stream_dir}/rental_payment"

rental_payment_bronze = f"{database_dir}/fact_rental_payment/bronze"
rental_payment_silver = f"{database_dir}/fact_rental_payment/silver"
rental_payment_gold   = f"{database_dir}/fact_rental_payment/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rental_payment", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[44]: True

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Capstone Database"
LOCATION "dbfs:/FileStore/capstone_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://cap9et-mysql.mysql.database.azure.com:3306/sakila_dm", --Replace with your Server Name
  dbtable "dim_date",
  user "cap9et",
  password "Ch1pmunk!123"
  )
  

In [0]:
%sql
-- select from temp view to check if it exists 

SELECT * FROM view_date

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000106,2000-01-06,2000/01/06,01/06/2000,06/01/2000,5,Thursday,6,6,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000107,2000-01-07,2000/01/07,01/07/2000,07/01/2000,6,Friday,7,7,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000108,2000-01-08,2000/01/08,01/08/2000,08/01/2000,7,Saturday,8,8,Weekend,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000109,2000-01-09,2000/01/09,01/09/2000,09/01/2000,1,Sunday,9,9,Weekend,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000110,2000-01-10,2000/01/10,01/10/2000,10/01/2000,2,Monday,10,10,Weekday,2,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/capstone_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_staff" that extracts data from your MySQL sakila_dm database.

CREATE OR REPLACE TEMPORARY VIEW view_staff
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://cap9et-mysql.mysql.database.azure.com:3306/sakila_dm", 
  dbtable "dim_staff",
  user "cap9et",
  password "Ch1pmunk!123"
  )
  

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_staff" using data from the view named "view_staff"

CREATE OR REPLACE TABLE sakila_dlh.dim_staff
COMMENT "Staff Dimension Table"
LOCATION "dbfs:/FileStore/capstone_data/sakila_dlh/dim_staff"
AS SELECT * FROM view_staff

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
staff_key,bigint,
staff_first_name,string,
staff_last_name,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_staff,
Created Time,Wed Dec 06 03:46:10 UTC 2023,
Last Access,UNKNOWN,


In [0]:
%sql
 SELECT * FROM sakila_dlh.dim_staff LIMIT 5


staff_key,staff_first_name,staff_last_name
1,Mike,Hillyer
2,Jon,Stephens


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/capstone_data/sakila_retail/batch'

path,name,size,modificationTime
dbfs:/FileStore/capstone_data/sakila_retail/batch/sakila_dim_customer.json,sakila_dim_customer.json,94383,1701830923000
dbfs:/FileStore/capstone_data/sakila_retail/batch/sakila_dim_payment.csv,sakila_dim_payment.csv,8917,1701750563000
dbfs:/FileStore/capstone_data/sakila_retail/batch/sakila_dim_staff.json,sakila_dim_staff.json,181,1701750564000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/capstone_data/sakila_retail/batch'
json_files = {"customer" : 'sakila_dim_customer.json'
              }

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[58]: <pymongo.results.InsertManyResult at 0x7f86b4a6a980>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
//Mongo Info//

import com.mongodb.spark._

val userName = "cap9et"
val pwd = "Ch1pmunk!123"
val clusterName = "ds2002cluster.m09nd7q"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri) /** dont need to have atlas_uri if everything else was configured correctly? **/
.option("database", "sakila_dm")
.option("collection", "customer").load()
.select("customer_key", "customer_first_name","customer_last_name", "customer_email")

display(df_customer)

customer_key,customer_first_name,customer_last_name,customer_email
1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org
3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org
4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org
5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org
6,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org
7,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org
8,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org
9,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org
10,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org


In [0]:
%scala
df_customer.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
customer_first_name,string,
customer_last_name,string,
customer_email,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_customer,
Created Time,Wed Dec 06 03:47:14 UTC 2023,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,customer_first_name,customer_last_name,customer_email
1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org
3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org
4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org
5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
payment_csv = f"{batch_dir}/sakila_dim_payment.csv"

df_payment = spark.read.format('csv').options(header='true', inferSchema='true').load(payment_csv)
display(df_payment)

payment_key,amount
1,2.99
2,0.99
3,5.99
4,0.99
5,9.99
6,4.99
7,4.99
8,0.99
9,3.99
10,5.99


In [0]:
df_payment.printSchema()

root
 |-- payment_key: integer (nullable = true)
 |-- amount: double (nullable = true)



In [0]:
df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_payment")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_payment;

col_name,data_type,comment
payment_key,int,
amount,double,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_payment,
Created Time,Wed Dec 06 03:47:47 UTC 2023,
Last Access,UNKNOWN,
Created By,Spark 3.3.0,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_payment LIMIT 5;

payment_key,amount
1,2.99
2,0.99
3,5.99
4,0.99
5,9.99


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_payment,False
sakila_dlh,dim_staff,False
,bronze_tempview,True
,display_query_1,True
,display_query_2,True
,rental_payment_raw_tempview,True
,rental_payment_silver_tempview,True
,view_customer,True


### Section III: Integrate Reference Data with Real-Time Data
#### 1.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 1.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_rental_purchase_key BIGINT")
 .option("cloudFiles.schemaHints", "rental_date_key BIGINT")
 .option("cloudFiles.schemaHints", "return_date_key DOUBLE")
 .option("cloudFiles.schemaHints", "payment_key DOUBLE") 
 .option("cloudFiles.schemaHints", "payment_date_key BIGINT")
 .option("cloudFiles.schemaHints", "inventory_key DOUBLE")
 .option("cloudFiles.schemaHints", "customer_key BIGINT")
 .option("cloudFiles.schemaHints", "staff_key DOUBLE")
 .option("cloudFiles.schemaHints", "store_key BIGINT") 
 .option("cloudFiles.schemaHints", "address_key BIGINT")
 .option("cloudFiles.schemaLocation", rental_payment_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rental_payment_stream_dir)
 .createOrReplaceTempView("rental_payment_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rental_payment_raw_tempview
)

In [0]:
%sql
SELECT * FROM bronze_tempview

address_key,customer_key,fact_rental_purchase_key,inventory_key,payment_date_key,payment_key,rental_date_key,return_date_key,staff_key,store_key,_rescued_data,receipt_time,source_file
22,18,751,904,20050801,485,20050801,20050809.0,2,2,,2023-12-06T03:48:47.525+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_04_1000.json
22,18,752,4370,20050818,488,20050818,20050821.0,2,2,,2023-12-06T03:48:47.525+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_04_1000.json
22,18,753,238,20050820,489,20050820,20050821.0,2,2,,2023-12-06T03:48:47.525+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_04_1000.json
23,19,754,3376,20050525,490,20050525,20050531.0,2,1,,2023-12-06T03:48:47.525+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_04_1000.json
23,19,755,4108,20050525,491,20050525,20050603.0,2,1,,2023-12-06T03:48:47.525+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_04_1000.json
23,19,756,377,20050528,494,20050528,20050529.0,2,1,,2023-12-06T03:48:47.525+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_04_1000.json
23,19,757,3921,20050706,499,20050706,20050706.0,2,1,,2023-12-06T03:48:47.525+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_04_1000.json
23,19,758,146,20050730,501,20050730,20050805.0,2,1,,2023-12-06T03:48:47.525+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_04_1000.json
23,19,759,1181,20050731,504,20050731,20050809.0,2,1,,2023-12-06T03:48:47.525+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_04_1000.json
23,19,760,2688,20050816,505,20050816,20050825.0,2,1,,2023-12-06T03:48:47.525+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_04_1000.json


In [0]:
(spark.table("bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_payment_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_payment_bronze"))

Out[70]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f86b48df0d0>

##### 1.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rental_payment_bronze")
  .createOrReplaceTempView("rental_payment_silver_tempview"))

In [0]:
%sql
SELECT * FROM rental_payment_silver_tempview

address_key,customer_key,fact_rental_purchase_key,inventory_key,payment_date_key,payment_key,rental_date_key,return_date_key,staff_key,store_key,_rescued_data,receipt_time,source_file
5,1,501,1021,20050615,4,20050615,20050619.0,2,1,,2023-12-06T03:49:19.835+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_03_750.json
5,1,502,197,20050618,7,20050618,20050622.0,2,1,,2023-12-06T03:49:19.835+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_03_750.json
5,1,503,1443,20050708,10,20050708,20050714.0,2,1,,2023-12-06T03:49:19.835+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_03_750.json
5,1,504,3486,20050708,11,20050708,20050712.0,2,1,,2023-12-06T03:49:19.835+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_03_750.json
5,1,505,3726,20050709,12,20050709,20050714.0,2,1,,2023-12-06T03:49:19.835+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_03_750.json
5,1,506,1330,20050711,14,20050711,20050719.0,2,1,,2023-12-06T03:49:19.835+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_03_750.json
5,1,507,1092,20050728,16,20050728,20050730.0,2,1,,2023-12-06T03:49:19.835+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_03_750.json
5,1,508,108,20050729,20,20050729,20050801.0,2,1,,2023-12-06T03:49:19.835+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_03_750.json
5,1,509,2219,20050731,21,20050731,20050802.0,2,1,,2023-12-06T03:49:19.835+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_03_750.json
5,1,510,3232,20050802,23,20050802,20050810.0,2,1,,2023-12-06T03:49:19.835+0000,dbfs:/FileStore/capstone_data/sakila_retail/stream/rental_payment/sakila_fact_rental_payment_03_750.json


In [0]:
%sql
DESCRIBE EXTENDED rental_payment_silver_tempview

col_name,data_type,comment
address_key,bigint,
customer_key,bigint,
fact_rental_purchase_key,bigint,
inventory_key,bigint,
payment_date_key,bigint,
payment_key,bigint,
rental_date_key,bigint,
return_date_key,bigint,
staff_key,bigint,
store_key,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rental_payment_silver_tempview AS (
  SELECT fr.fact_rental_purchase_key,
      fr.payment_key,
      fr.inventory_key,
      fr.customer_key,
      fr.staff_key,
      fr.store_key,
      fr.address_key,
      c.customer_first_name,
      c.customer_last_name,
      c.customer_email,
      p.amount AS payment_amount,
      s.staff_first_name,
      s.staff_last_name,
      
      fr.rental_date_key,
      rd.day_name_of_week AS rental_day_name_of_week,
      rd.day_of_month AS rental_day_of_month,
      rd.weekday_weekend AS rental_weekday_weekend,
      rd.month_name AS rental_month_name,
      rd.calendar_quarter AS rental_quarter,
      rd.calendar_year AS rental_year,

      fr.return_date_key,
      red.day_name_of_week AS return_day_name_of_week,
      red.day_of_month AS return_day_of_month,
      red.weekday_weekend AS return_weekday_weekend,
      red.month_name AS return_month_name,
      red.calendar_quarter AS return_quarter,
      red.calendar_year AS return_year,

      fr.payment_date_key,
      pd.day_name_of_week AS payment_day_name_of_week,
      pd.day_of_month AS payment_day_of_month,
      pd.weekday_weekend AS payment_weekday_weekend,
      pd.month_name AS payment_month_name,
      pd.calendar_quarter AS payment_quarter,
      pd.calendar_year AS payment_year


  FROM rental_payment_silver_tempview AS fr
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_key = fr.customer_key
  INNER JOIN sakila_dlh.dim_payment AS p
  ON p.payment_key = fr.payment_key
  INNER JOIN sakila_dlh.dim_staff AS s
  ON s.staff_key = fr.staff_key

  LEFT OUTER JOIN sakila_dlh.dim_date AS rd
  ON rd.date_key = fr.rental_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS red
  ON red.date_key = fr.return_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS pd
  ON pd.date_key = fr.payment_date_key
)

In [0]:
(spark.table("fact_rental_payment_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_payment_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_payment_silver"))

Out[75]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f86b48e08e0>

In [0]:
%sql
SELECT * FROM fact_rental_payment_silver

fact_rental_purchase_key,payment_key,inventory_key,customer_key,staff_key,store_key,address_key,customer_first_name,customer_last_name,customer_email,payment_amount,staff_first_name,staff_last_name,rental_date_key,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_quarter,rental_year,return_date_key,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_quarter,return_year,payment_date_key,payment_day_name_of_week,payment_day_of_month,payment_weekday_weekend,payment_month_name,payment_quarter,payment_year
500,3,2785,1,2,1,5,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5.99,Jon,Stephens,20050615,Wednesday,15,Weekday,June,2,2005,20050623.0,Thursday,23.0,Weekday,June,2.0,2005.0,20050615,Wednesday,15,Weekday,June,2,2005
30,59,2898,2,1,1,6,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,4.99,Mike,Hillyer,20050823,Tuesday,23,Weekday,August,3,2005,20050825.0,Thursday,25.0,Weekday,August,3.0,2005.0,20050823,Tuesday,23,Weekday,August,3,2005
42,84,1685,3,1,1,7,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,0.99,Mike,Hillyer,20050822,Monday,22,Weekday,August,3,2005,20050823.0,Tuesday,23.0,Weekday,August,3.0,2005.0,20050822,Monday,22,Weekday,August,3,2005
51,107,3308,4,1,2,8,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,1.99,Mike,Hillyer,20050823,Tuesday,23,Weekday,August,3,2005,20050827.0,Saturday,27.0,Weekend,August,3.0,2005.0,20050823,Tuesday,23,Weekday,August,3,2005
73,144,61,5,1,1,9,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,0.99,Mike,Hillyer,20050822,Monday,22,Weekday,August,3,2005,20050825.0,Thursday,25.0,Weekday,August,3.0,2005.0,20050822,Monday,22,Weekday,August,3,2005
87,173,2565,6,1,2,10,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,0.99,Mike,Hillyer,20050823,Tuesday,23,Weekday,August,3,2005,20050828.0,Sunday,28.0,Weekend,August,3.0,2005.0,20050823,Tuesday,23,Weekday,August,3,2005
101,205,3424,7,1,1,11,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,0.99,Mike,Hillyer,20050820,Saturday,20,Weekend,August,3,2005,20050823.0,Tuesday,23.0,Weekday,August,3.0,2005.0,20050820,Saturday,20,Weekend,August,3,2005
113,229,2937,8,1,2,12,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,2.99,Mike,Hillyer,20050823,Tuesday,23,Weekday,August,3,2005,20050825.0,Thursday,25.0,Weekday,August,3.0,2005.0,20050823,Tuesday,23,Weekday,August,3,2005
142,278,67,10,1,1,14,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,5.99,Mike,Hillyer,20050822,Monday,22,Weekday,August,3,2005,20050827.0,Saturday,27.0,Weekend,August,3.0,2005.0,20050822,Monday,22,Weekday,August,3,2005
155,300,3083,11,1,2,15,LISA,ANDERSON,LISA.ANDERSON@sakilacustomer.org,0.99,Mike,Hillyer,20050822,Monday,22,Weekday,August,3,2005,20050823.0,Tuesday,23.0,Weekday,August,3.0,2005.0,20050822,Monday,22,Weekday,August,3,2005


Databricks data profile. Run in Databricks to view.

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rental_payment_silver

col_name,data_type,comment
fact_rental_purchase_key,bigint,
payment_key,bigint,
inventory_key,bigint,
customer_key,bigint,
staff_key,bigint,
store_key,bigint,
address_key,bigint,
customer_first_name,string,
customer_last_name,string,
customer_email,string,


##### 1.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach. The table should include the number of products sold per customer each Month, along with the Customers' ID, First & Last Name, and the Month in which the order was placed.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_payment_amount_by_customer_gold AS (
  SELECT customer_key AS CustomerID
    , customer_last_name AS CustomerLastName
    , customer_first_name AS CustomerFirstName
    , round(sum(payment_amount), 2) AS All_Time_Amount_Payed
  FROM sakila_dlh.fact_rental_payment_silver
  GROUP BY CustomerID, CustomerLastName, CustomerFirstName
  ORDER BY All_Time_Amount_Payed DESC
  
  );

SELECT * FROM sakila_dlh.fact_payment_amount_by_customer_gold;

CustomerID,CustomerLastName,CustomerFirstName,All_Time_Amount_Payed
21,CLARK,MICHELLE,155.65
26,HALL,JESSICA,152.66
7,MILLER,MARIA,151.67
5,BROWN,ELIZABETH,144.62
29,HERNANDEZ,ANGELA,140.64
3,WILLIAMS,LINDA,135.74
15,HARRIS,HELEN,134.68
13,JACKSON,KAREN,131.73
35,GREEN,VIRGINIA,129.68
2,JOHNSON,PATRICIA,128.73


#### 2.0 Clean up the File System

In [0]:
%fs rm -r /FileStore/capstone_data/