## Final Capstone Project
In my final project, I will be 

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ssa4ec-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "ssa4ec",
  "password" : "Passw0rd123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.lsmgeeb"
atlas_database_name = "sakila_dw"
atlas_user_name = "ssa4ec"
atlas_password = "Passw0rd123"

mongo_uri = f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net/{atlas_database_name}"
print(mongo_uri)

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/ds2002-final"
database_dir = f"{base_dir}/{dst_database}"

#data_dir = f"{base_dir}/source_data"
batch_dir = f"{base_dir}/batch"
stream_dir = f"{base_dir}/stream"

filmrentals_stream_dir = f"{stream_dir}" #fact table data

filmrentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
filmrentals_output_silver = f"{database_dir}/fact_rentals/silver"
filmrentals_output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

mongodb+srv://ssa4ec:Passw0rd123@cluster0.lsmgeeb.mongodb.net/sakila_dw
Out[199]: True

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Project"
LOCATION "dbfs:/FileStore/ds2002-final/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ssa4ec-mysql.mysql.database.azure.com:3306/sakila_dw",
  dbtable "dim_date",
  user "sabdulali",
  password "Passw0rd123"
)


In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-final/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Staff Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_staff
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ssa4ec-mysql.mysql.database.azure.com:3306/sakila_dw",
  dbtable "dim_staff",
  user "sabdulali",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_staff
COMMENT "Staff Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh/dim_staff"
AS SELECT * FROM view_staff

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
staff_key,bigint,
first_name,string,
last_name,string,
email,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_staff,
Type,EXTERNAL,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5

staff_key,first_name,last_name,email
1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com
2,Jon,Stephens,Jon.Stephens@sakilastaff.com


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-final/batch/sakiladw_customers.csv,sakiladw_customers.csv,30327,1683256149000
dbfs:/FileStore/ds2002-final/batch/sakiladw_payment.json,sakiladw_payment.json,154950,1683256151000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds2002-final/batch'
json_files = {"payment" : 'sakiladw_payment.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[212]: <pymongo.results.InsertManyResult at 0x7ff8e96d07c0>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_payment = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw").option("collection", "payment").load()
.select("payment_key","customer_key","staff_key","rental_key","amount","payment_date")

display(df_payment)

payment_key,customer_key,staff_key,rental_key,amount,payment_date
1,1,1,76,2.99,2005-05-25 11:30:37
2,1,1,573,0.99,2005-05-28 10:35:23
3,1,1,1185,5.99,2005-06-15 00:54:12
4,1,2,1422,0.99,2005-06-15 18:02:53
5,1,2,1476,9.99,2005-06-15 21:08:46
6,1,1,1725,4.99,2005-06-16 15:18:57
7,1,1,2308,4.99,2005-06-18 08:41:48
8,1,2,2363,0.99,2005-06-18 13:33:59
9,1,1,3284,3.99,2005-06-21 06:24:45
10,1,2,4526,5.99,2005-07-08 03:17:05


In [0]:
%scala
df_payment.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_payment")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_payment

col_name,data_type,comment
payment_key,int,
customer_key,int,
staff_key,int,
rental_key,int,
amount,double,
payment_date,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_payment LIMIT 5

payment_key,customer_key,staff_key,rental_key,amount,payment_date
1,1,1,76,2.99,2005-05-25 11:30:37
2,1,1,573,0.99,2005-05-28 10:35:23
3,1,1,1185,5.99,2005-06-15 00:54:12
4,1,2,1422,0.99,2005-06-15 18:02:53
5,1,2,1476,9.99,2005-06-15 21:08:46


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
customer_csv = f"{batch_dir}/sakiladw_customers.csv"

df_customer = spark.read.format('csv').options(header='true', inferSchema='true').load(customer_csv)
display(df_customer)

customer_key,first_name,last_name,email
1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org
3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org
4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org
5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org
6,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org
7,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org
8,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org
9,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org
10,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org


In [0]:
df_customer.printSchema()

root
 |-- customer_key: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)



In [0]:
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_key,int,
first_name,string,
last_name,string,
email,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_customer,
Type,MANAGED,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5;

customer_key,first_name,last_name,email
1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org
3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org
4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org
5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_payment,False
sakila_dlh,dim_staff,False
,display_query_10,True
,display_query_11,True
,display_query_12,True
,display_query_9,True
,fact_filmrentals_silver_tempview,True
,filmrentals_bronze_tempview,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "customer_key BIGINT")
 .option("cloudFiles.schemaHints", "staff_key BIGINT")
 .option("cloudFiles.schemaHints", "inventory_key BIGINT")
 .option("cloudFiles.schemaHints", "payment_key BIGINT") 
 .option("cloudFiles.schemaHints", "film_key BIGINT")
 #.option("cloudFiles.schemaHints", "payment DECIMAL") #HAD TO DROP THESE 5 COLUMNS BELOW IN MY ORIGINAL FACT TABLE FROM THE MIDTERM, SO I MAY ADD THEM IN LATER WHEN CREATING fact_filmrentals_silver_tempview, I COMMENTED THEM OUT HERE TO SHOW I DROPPED THEM
#  .option("cloudFiles.schemaHints", "customer_first_name STRING")
#  .option("cloudFiles.schemaHints", "customer_last_name STRING")
#  .option("cloudFiles.schemaHints", "staff_first_name STRING") 
#  .option("cloudFiles.schemaHints", "staff_last_name STRING") 
 .option("cloudFiles.schemaHints", "title STRING")
 .option("cloudFiles.schemaHints", "rental_rate DECIMAL")
 .option("cloudFiles.schemaHints", "rating STRING")
 .option("cloudFiles.schemaHints", "rental_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "return_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "payment_date_key DECIMAL")
 .option("cloudFiles.schemaLocation", filmrentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(filmrentals_stream_dir)
 .createOrReplaceTempView("filmrentals_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW filmrentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM filmrentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM filmrentals_bronze_tempview

customer_key,film_key,inventory_key,payment_date_key,payment_key,rating,rental_date_key,rental_key,rental_rate,return_date_key,staff_key,title,_rescued_data,receipt_time,source_file
19,164,751,,493,PG,,337,0.99,,1,COAST RAINBOW,,2023-05-07T20:41:40.425+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
14,941,4319,,359,NC-17,,346,4.99,,2,VIDEOTAPE ARSENIC,,2023-05-07T20:41:40.425+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
36,641,2920,,984,PG-13,,349,0.99,,2,ORANGE GRAPES,,2023-05-07T20:41:40.425+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
9,604,2756,,231,G,,350,0.99,,2,MULAN MOON,,2023-05-07T20:41:40.425+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
22,735,3347,,579,PG-13,,370,2.99,,2,ROBBERS JOON,,2023-05-07T20:41:40.425+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
28,96,434,,750,PG-13,,388,2.99,,1,BREAKING HOME,,2023-05-07T20:41:40.425+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
35,618,2815,,953,PG,,424,0.99,,1,NECKLACE OUTBREAK,,2023-05-07T20:41:40.425+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
3,732,3328,,60,G,,435,0.99,,2,RINGS HEARTBREAKERS,,2023-05-07T20:41:40.425+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
21,181,826,,545,PG-13,,463,2.99,,1,CONTACT ANONYMOUS,,2023-05-07T20:41:40.425+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
32,330,1510,,878,NC-17,,483,4.99,,1,FORRESTER COMANCHEROS,,2023-05-07T20:41:40.425+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json


In [0]:
(spark.table("filmrentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{filmrentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_filmrentals_bronze"))

Out[224]: <pyspark.sql.streaming.query.StreamingQuery at 0x7ff8e94e5ac0>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_filmrentals_bronze")
  .createOrReplaceTempView("filmrentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM filmrentals_silver_tempview

customer_key,film_key,inventory_key,payment_date_key,payment_key,rating,rental_date_key,rental_key,rental_rate,return_date_key,staff_key,title,_rescued_data,receipt_time,source_file
19,164,751,,493,PG,,337,0.99,,1,COAST RAINBOW,,2023-05-07T20:41:52.664+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
14,941,4319,,359,NC-17,,346,4.99,,2,VIDEOTAPE ARSENIC,,2023-05-07T20:41:52.664+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
36,641,2920,,984,PG-13,,349,0.99,,2,ORANGE GRAPES,,2023-05-07T20:41:52.664+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
9,604,2756,,231,G,,350,0.99,,2,MULAN MOON,,2023-05-07T20:41:52.664+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
22,735,3347,,579,PG-13,,370,2.99,,2,ROBBERS JOON,,2023-05-07T20:41:52.664+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
28,96,434,,750,PG-13,,388,2.99,,1,BREAKING HOME,,2023-05-07T20:41:52.664+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
35,618,2815,,953,PG,,424,0.99,,1,NECKLACE OUTBREAK,,2023-05-07T20:41:52.664+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
3,732,3328,,60,G,,435,0.99,,2,RINGS HEARTBREAKERS,,2023-05-07T20:41:52.664+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
21,181,826,,545,PG-13,,463,2.99,,1,CONTACT ANONYMOUS,,2023-05-07T20:41:52.664+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json
32,330,1510,,878,NC-17,,483,4.99,,1,FORRESTER COMANCHEROS,,2023-05-07T20:41:52.664+0000,dbfs:/FileStore/ds2002-final/stream/fact_rentals2.json


In [0]:
%sql
DESCRIBE EXTENDED filmrentals_silver_tempview

col_name,data_type,comment
customer_key,bigint,
film_key,bigint,
inventory_key,bigint,
payment_date_key,"decimal(10,0)",
payment_key,bigint,
rating,string,
rental_date_key,string,
rental_key,bigint,
rental_rate,double,
return_date_key,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_filmrentals_silver_tempview AS (
  SELECT rs.rental_key
    , rs.customer_key
    , rs.staff_key
    , rs.inventory_key
    , rs.payment_key
    , rs.film_key
    , rs.title
    , rs.rental_rate
    , rs.rating

    , s.first_name AS staff_first_name
    , s.last_name AS staff_last_name
    , s.email AS staff_email

    , c.first_name AS customer_first_name
    , c.last_name AS customer_last_name
    , c.email AS customer_email

    , p.amount AS payment
    , p.payment_date AS p_date

    , rs.rental_date_key
    , rtl.day_name_of_week AS rental_day_name_of_week
    , rtl.day_of_month AS rental_day_of_month
    , rtl.weekday_weekend AS rental_weekday_weekend
    , rtl.month_name AS rental_month_name
    , rtl.calendar_quarter AS rental_quarter
    , rtl.calendar_year AS rental_year

    , rs.return_date_key
    , rtn.day_name_of_week AS return_day_name_of_week
    , rtn.day_of_month AS return_day_of_month
    , rtn.weekday_weekend AS return_weekday_weekend
    , rtn.month_name AS return_month_name
    , rtn.calendar_quarter AS return_quarter
    , rtn.calendar_year AS return_year


FROM filmrentals_silver_tempview AS rs
INNER JOIN sakila_dlh.dim_staff AS s
ON s.staff_key = rs.staff_key
INNER JOIN sakila_dlh.dim_customer AS c
ON c.customer_key = rs.customer_key
INNER JOIN sakila_dlh.dim_payment AS p
ON p.payment_key = rs.payment_key

LEFT OUTER JOIN sakila_dlh.dim_date AS rtl
ON rtl.date_key = rs.rental_date_key
LEFT OUTER JOIN sakila_dlh.dim_date AS rtn
ON rtn.date_key = rs.return_date_key
)

In [0]:
(spark.table("fact_filmrentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{filmrentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_filmrentals_silver"))

Out[229]: <pyspark.sql.streaming.query.StreamingQuery at 0x7ff8e94a67f0>

In [0]:
%sql
SELECT * FROM fact_filmrentals_silver
LIMIT 5

rental_key,customer_key,staff_key,inventory_key,payment_key,film_key,title,rental_rate,rating,staff_first_name,staff_last_name,staff_email,customer_first_name,customer_last_name,customer_email,payment,p_date,rental_date_key,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_quarter,rental_year,return_date_key,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_quarter,return_year
337,19,1,751,493,164,COAST RAINBOW,0.99,PG,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,RUTH,MARTINEZ,RUTH.MARTINEZ@sakilacustomer.org,2.99,2005-05-27 03:22:30,,,,,,,,,,,,,,
346,14,2,4319,359,941,VIDEOTAPE ARSENIC,4.99,NC-17,Jon,Stephens,Jon.Stephens@sakilastaff.com,BETTY,WHITE,BETTY.WHITE@sakilacustomer.org,9.99,2005-05-27 04:34:41,,,,,,,,,,,,,,
349,36,2,2920,984,641,ORANGE GRAPES,0.99,PG-13,Jon,Stephens,Jon.Stephens@sakilastaff.com,KATHLEEN,ADAMS,KATHLEEN.ADAMS@sakilacustomer.org,0.99,2005-05-27 04:53:11,,,,,,,,,,,,,,
350,9,2,2756,231,604,MULAN MOON,0.99,G,Jon,Stephens,Jon.Stephens@sakilastaff.com,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,4.99,2005-05-27 05:01:28,,,,,,,,,,,,,,
370,22,2,3347,579,735,ROBBERS JOON,2.99,PG-13,Jon,Stephens,Jon.Stephens@sakilastaff.com,LAURA,RODRIGUEZ,LAURA.RODRIGUEZ@sakilacustomer.org,4.99,2005-05-27 07:49:43,,,,,,,,,,,,,,


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_filmrentals_silver

col_name,data_type,comment
rental_key,bigint,
customer_key,bigint,
staff_key,bigint,
inventory_key,bigint,
payment_key,bigint,
film_key,bigint,
title,string,
rental_rate,double,
rating,string,
staff_first_name,string,


##### 6.3. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT c.last_name, c.first_name
  , ROUND(SUM(rs.payment), 3) AS total_payments
  FROM sakila_dlh.`fact_filmrentals_silver` AS rs
  INNER JOIN sakila_dlh.dim_customer AS c
  ON rs.customer_key = c.customer_key
  INNER JOIN sakila_dlh.dim_staff AS s
  ON rs.staff_key = s.staff_key
  WHERE s.staff_key = '2'
  GROUP BY c.last_name, c.first_name

last_name,first_name,total_payments
MOORE,MARGARET,4.99
LOPEZ,AMY,4.99
WILLIAMS,LINDA,1.99
WHITE,BETTY,12.98
SMITH,MARY,2.99
HALL,JESSICA,2.99
ROBINSON,SHARON,6.99
CLARK,MICHELLE,3.99
RODRIGUEZ,LAURA,18.97
GARCIA,CAROL,7.98


#### 9.0. Clean up the File System

In [0]:
#%fs rm -r /FileStore/ds2002-final/