# DS 2002 Capstone: Structured Streaming with Sakila database

## 1: Import required libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd 
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

## 2: Instantiate global variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "pub3ha-sql-server.mysql.database.azure.com" # changed this
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "pub3ha", # changed this
  "password" : "Frankie.2648",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.dcpaosv" # changed this
atlas_database_name = "sakila_dw"
atlas_user_name = "pub3ha-mongo"
atlas_password = "Frankie.3648" # is this my normal login?

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/ds2002-capstone"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rental_stream_dir = f"{stream_dir}/fact_rental" # verify if this is rental or fact rental

rental_output_bronze = f"{database_dir}/fact_rental/bronze"
rental_output_silver = f"{database_dir}/fact_rental/silver"
rental_output_gold   = f"{database_dir}/fact_rental/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rental", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[2]: True

## 3: Define global functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

# Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data
## 1.0. Fetch Reference Data From an Azure MySQL Database
## 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Capstone Database"
LOCATION "dbfs:/FileStore/ds2002-capstone/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone");

## 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://pub3ha-mysql-ws2.mysql.database.azure.com/sakila_dw",
  dbtable "dim_date",
  user "pub3ha", 
  password "Frankie.3648" 
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-capstone/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM northwind_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


## 1.3. Create a New Table that Sources Customer Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://pub3ha-mysql-ws2.mysql.database.azure.com/sakila_dw",
  dbtable "dim_customer",
  user "pub3ha", 
  password "Frankie.3648" 
)

In [0]:
%sql
USE DATABASE sakila_dlh;


CREATE OR REPLACE TABLE sakila_dlh.dim_customer
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-capstone/sakila_dlh/dim_customer"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_id,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
address_id,int,
active,boolean,
create_date,timestamp,
last_update,timestamp,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,True,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,True,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,True,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,True,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,True,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000


## 2.0. Fetch Reference Data from a MongoDB Atlas Database
### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-capstone/source_data/batch/dim_customer.json,dim_customer.json,160173,1682439158000
dbfs:/FileStore/ds2002-capstone/source_data/batch/dim_film.json,dim_film.json,462531,1682439158000
dbfs:/FileStore/ds2002-capstone/source_data/batch/dim_staff.csv,dim_staff.csv,309,1682439157000


### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/ds2002-capstone/source_data/batch'
json_files = {"film" : 'dim_film.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[15]: <pymongo.results.InsertManyResult at 0x7f4ce6267180>

### 2.3.1. Fetch Film Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_film = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw").option("collection", "film").option("uri",f"mongodb+srv://pub3ha:Frankie.3648@cluster0.dcpaosv.mongodb.net/sakila_dw").load()
.select("film_id","title","description","release_year","language_id","rental_duration","rental_rate","length","replacement_cost","rating","special_features")

display(df_film)

film_id,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,6,2.99,130,22.99,G,Deleted Scenes
6,AGENT TRUMAN,A Intrepid Panorama of a Robot And a Boy who must Escape a Sumo Wrestler in Ancient China,2006,1,3,2.99,169,17.99,PG,Deleted Scenes
7,AIRPLANE SIERRA,A Touching Saga of a Hunter And a Butler who must Discover a Butler in A Jet Boat,2006,1,6,4.99,62,28.99,PG-13,"Trailers,Deleted Scenes"
8,AIRPORT POLLOCK,A Epic Tale of a Moose And a Girl who must Confront a Monkey in Ancient India,2006,1,6,4.99,54,15.99,R,Trailers
9,ALABAMA DEVIL,A Thoughtful Panorama of a Database Administrator And a Mad Scientist who must Outgun a Mad Scientist in A Jet Boat,2006,1,3,2.99,114,21.99,PG-13,"Trailers,Deleted Scenes"
10,ALADDIN CALENDAR,A Action-Packed Tale of a Man And a Lumberjack who must Reach a Feminist in Ancient China,2006,1,6,4.99,63,24.99,NC-17,"Trailers,Deleted Scenes"


In [0]:
%scala
df_film.printSchema()

### 2.3.2. Use the Spark DataFrame to Create a New Film Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film

col_name,data_type,comment
film_id,int,
title,string,
description,string,
release_year,int,
language_id,int,
rental_duration,int,
rental_rate,double,
length,int,
replacement_cost,double,
rating,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5

film_id,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,6,2.99,130,22.99,G,Deleted Scenes


## 3.0. Fetch Data from a File System
### 3.1. Use PySpark to Read From a CSV File

In [0]:
staff_csv = f"{batch_dir}/dim_staff.csv"

df_staff = spark.read.format('csv').options(header='true', inferSchema='true').load(staff_csv)
display(df_staff)

staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000
2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15T03:57:16.000+0000


In [0]:
df_staff.printSchema()

root
 |-- staff_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- picture: string (nullable = true)
 |-- email: string (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- username: string (nullable = true)
 |-- password: string (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
staff_id,int,
first_name,string,
last_name,string,
address_id,int,
picture,string,
email,string,
store_id,int,
active,int,
username,string,
password,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5;

staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000
2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15T03:57:16.000+0000


## Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
sakila_dlh,dim_staff,False
,view_customer,True
,view_date,True


## Section III: Integrate Reference Data with Real-Time Data
### 6.0. Use AutoLoader to Process Streaming (Hot Path) Rental Fact Data
### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", rental_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rental_stream_dir)
 .createOrReplaceTempView("rental_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rental_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rental_raw_tempview
)

In [0]:
%sql
SELECT * FROM rental_bronze_tempview

amount,customer_id,film_id,inventory_id,last_update,payment_date,payment_id,rental_date,rental_id,return_date,staff_id,store_id,_rescued_data,receipt_time,source_file
2.99,130,80,367,2006-02-15 21:30:53,2005-05-24 22:53:30,3504,2005-05-24 22:53:30,1,2005-05-26 22:04:30,1,1,,2023-04-27T15:24:41.451+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
2.99,459,333,1525,2006-02-15 21:30:53,2005-05-24 22:54:33,12377,2005-05-24 22:54:33,2,2005-05-28 19:40:33,1,2,,2023-04-27T15:24:41.451+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
3.99,408,373,1711,2006-02-15 21:30:53,2005-05-24 23:03:39,11032,2005-05-24 23:03:39,3,2005-06-01 22:12:39,1,2,,2023-04-27T15:24:41.451+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
4.99,333,535,2452,2006-02-15 21:30:53,2005-05-24 23:04:41,8987,2005-05-24 23:04:41,4,2005-06-03 01:43:41,2,1,,2023-04-27T15:24:41.451+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
6.99,222,450,2079,2006-02-15 21:30:53,2005-05-24 23:05:21,6003,2005-05-24 23:05:21,5,2005-06-02 04:33:21,1,2,,2023-04-27T15:24:41.451+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
0.99,549,613,2792,2006-02-15 21:30:53,2005-05-24 23:08:07,14728,2005-05-24 23:08:07,6,2005-05-27 01:32:07,1,1,,2023-04-27T15:24:41.451+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
1.99,269,870,3995,2006-02-15 21:30:53,2005-05-24 23:11:53,7274,2005-05-24 23:11:53,7,2005-05-29 20:34:53,2,2,,2023-04-27T15:24:41.451+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
4.99,239,510,2346,2006-02-15 21:30:53,2005-05-24 23:31:46,6440,2005-05-24 23:31:46,8,2005-05-27 23:33:46,2,1,,2023-04-27T15:24:41.451+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
4.99,126,565,2580,2006-02-15 21:30:53,2005-05-25 00:00:40,3386,2005-05-25 00:00:40,9,2005-05-28 00:22:40,1,1,,2023-04-27T15:24:41.451+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
5.99,399,396,1824,2006-02-15 21:30:53,2005-05-25 00:02:21,10785,2005-05-25 00:02:21,10,2005-05-31 22:44:21,2,2,,2023-04-27T15:24:41.451+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json


In [0]:
(spark.table("rental_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_bronze"))

Out[27]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f4ce5145fd0>

### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rental_bronze")
  .createOrReplaceTempView("rental_silver_tempview"))

In [0]:
%sql
SELECT * FROM rental_silver_tempview

amount,customer_id,film_id,inventory_id,last_update,payment_date,payment_id,rental_date,rental_id,return_date,staff_id,store_id,_rescued_data,receipt_time,source_file
2.99,130,80,367,2006-02-15 21:30:53,2005-05-24 22:53:30,3504,2005-05-24 22:53:30,1,2005-05-26 22:04:30,1,1,,2023-04-27T15:24:43.796+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
2.99,459,333,1525,2006-02-15 21:30:53,2005-05-24 22:54:33,12377,2005-05-24 22:54:33,2,2005-05-28 19:40:33,1,2,,2023-04-27T15:24:43.796+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
3.99,408,373,1711,2006-02-15 21:30:53,2005-05-24 23:03:39,11032,2005-05-24 23:03:39,3,2005-06-01 22:12:39,1,2,,2023-04-27T15:24:43.796+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
4.99,333,535,2452,2006-02-15 21:30:53,2005-05-24 23:04:41,8987,2005-05-24 23:04:41,4,2005-06-03 01:43:41,2,1,,2023-04-27T15:24:43.796+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
6.99,222,450,2079,2006-02-15 21:30:53,2005-05-24 23:05:21,6003,2005-05-24 23:05:21,5,2005-06-02 04:33:21,1,2,,2023-04-27T15:24:43.796+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
0.99,549,613,2792,2006-02-15 21:30:53,2005-05-24 23:08:07,14728,2005-05-24 23:08:07,6,2005-05-27 01:32:07,1,1,,2023-04-27T15:24:43.796+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
1.99,269,870,3995,2006-02-15 21:30:53,2005-05-24 23:11:53,7274,2005-05-24 23:11:53,7,2005-05-29 20:34:53,2,2,,2023-04-27T15:24:43.796+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
4.99,239,510,2346,2006-02-15 21:30:53,2005-05-24 23:31:46,6440,2005-05-24 23:31:46,8,2005-05-27 23:33:46,2,1,,2023-04-27T15:24:43.796+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
4.99,126,565,2580,2006-02-15 21:30:53,2005-05-25 00:00:40,3386,2005-05-25 00:00:40,9,2005-05-28 00:22:40,1,1,,2023-04-27T15:24:43.796+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json
5.99,399,396,1824,2006-02-15 21:30:53,2005-05-25 00:02:21,10785,2005-05-25 00:02:21,10,2005-05-31 22:44:21,2,2,,2023-04-27T15:24:43.796+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_rental/fact_rental.json


In [0]:
%sql
DESCRIBE EXTENDED rental_silver_tempview

col_name,data_type,comment
amount,double,
customer_id,bigint,
film_id,bigint,
inventory_id,bigint,
last_update,string,
payment_date,string,
payment_id,bigint,
rental_date,string,
rental_id,bigint,
return_date,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rental_silver_tempview AS (
  SELECT 
  r.rental_id AS fact_rental_key,
  r.customer_id AS customer_key,
  r.film_id AS film_key,
  r.inventory_id AS inventory_key,
  r.staff_id AS staff_key,
  rd1.day_of_month AS rental_day_of_month,
  pd.day_of_month AS payment_day_of_month,
  rd2.day_of_month AS return_day_of_month,
  r.rental_date,
  r.amount AS rental_amount,
  s.last_name AS staff_last_name,
  s.first_name AS staff_first_name,
  c.first_name AS customer_first_name,
  c.last_name AS customer_last_name,
  f.title AS film_title
  FROM rental_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_staff AS s
  ON s.staff_id = r.staff_id
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_id = r.customer_id
  INNER JOIN sakila_dlh.dim_film AS f
  ON f.film_id = r.film_id
  LEFT OUTER JOIN sakila_dlh.dim_date AS pd
  ON pd.full_date = CAST(r.payment_date AS DATE)
  LEFT OUTER JOIN sakila_dlh.dim_date AS rd1
  ON rd1.full_date = CAST(r.rental_date AS DATE)
 LEFT OUTER JOIN sakila_dlh.dim_date AS rd2
  ON rd2.full_date = CAST(r.return_date AS DATE)
)

In [0]:
(spark.table("fact_rental_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_silver"))

Out[32]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f4ce4021940>

In [0]:
%sql
SELECT * FROM fact_rental_silver

fact_rental_key,customer_key,film_key,inventory_key,staff_key,rental_day_of_month,payment_day_of_month,return_day_of_month,rental_date,rental_amount,staff_last_name,staff_first_name,customer_first_name,customer_last_name,film_title
1,130,80,367,1,24,24,26,2005-05-24 22:53:30,2.99,Hillyer,Mike,CHARLOTTE,HUNTER,BLANKET BEVERLY
2,459,333,1525,1,24,24,28,2005-05-24 22:54:33,2.99,Hillyer,Mike,TOMMY,COLLAZO,FREAKY POCUS
3,408,373,1711,1,24,24,1,2005-05-24 23:03:39,3.99,Hillyer,Mike,MANUEL,MURRELL,GRADUATE LORD
4,333,535,2452,2,24,24,3,2005-05-24 23:04:41,4.99,Stephens,Jon,ANDREW,PURDY,LOVE SUICIDES
5,222,450,2079,1,24,24,2,2005-05-24 23:05:21,6.99,Hillyer,Mike,DELORES,HANSEN,IDOLS SNATCHERS
6,549,613,2792,1,24,24,27,2005-05-24 23:08:07,0.99,Hillyer,Mike,NELSON,CHRISTENSON,MYSTIC TRUMAN
7,269,870,3995,2,24,24,29,2005-05-24 23:11:53,1.99,Stephens,Jon,CASSANDRA,WALTERS,SWARM GOLD
8,239,510,2346,2,24,24,27,2005-05-24 23:31:46,4.99,Stephens,Jon,MINNIE,ROMERO,LAWLESS VISION
9,126,565,2580,1,25,25,28,2005-05-25 00:00:40,4.99,Hillyer,Mike,ELLEN,SIMPSON,MATRIX SNOWMAN
10,399,396,1824,2,25,25,31,2005-05-25 00:02:21,5.99,Stephens,Jon,DANNY,ISOM,HANGING DEEP


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rental_silver

col_name,data_type,comment
fact_rental_key,bigint,
customer_key,bigint,
film_key,bigint,
inventory_key,bigint,
staff_key,bigint,
rental_day_of_month,int,
payment_day_of_month,int,
return_day_of_month,int,
rental_date,string,
rental_amount,double,


### 6.3. Gold Table: Perform Aggregations
#### All rentals took place in May, so this query displays the total amount spent and the number of films rented for each customer on a specific day of the month.

In [0]:
%sql
SELECT sum(rental_amount) AS total_rental_amount,
count(film_title) AS number_of_films,
customer_last_name AS customer,
rental_day_of_month AS day_in_May
FROM sakila_dlh.fact_rental_silver
GROUP BY customer, day_in_May
ORDER BY total_rental_amount DESC

total_rental_amount,number_of_films,customer,day_in_May
19.97,3,SULLIVAN,27
16.98,2,HAUSER,29
15.98,2,RAPP,26
15.97,3,PARKER,26
14.98,2,MENDOZA,27
13.98,2,GRANT,30
13.98,2,GOODEN,28
12.98,2,JACOBS,27
12.98,2,BUSTAMANTE,28
12.98,2,JAMES,26
