## Project 2 Capstone
This project will implement Data Lakehouse architecture using Databricks Spark Structured Streaming & Delta Tables technologies. Data from a rational database (SQL), a NoSQL database (MongoDB), and cloud file system data (CSV) will be extracted, transformed, and loaded into a destination system implementing hot path and cold path data.

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd 
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables for Database

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ds2002-mysql-stokes.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "sstokes001",
  "password" : "Rw459484#",
  "driver" : "org.mariadb.jdbc.Driver"
}

atlas_cluster_name = "stokesclsuter.gcl0xol"
atlas_database_name = "sakila_dw"
atlas_user_name = "sstokes002"
atlas_password = "Rw459484"

dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/Project2"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/data"

rental_dir = f"{base_dir}/data/stream"


rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

#### 3.0. Define Global Functions for MongoDB

In [0]:
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Ingesting Reference (Cold-path) Data 
First getting reference Data From an Azure MySQL Database and creating a new databricks metadata database called sakila_dlh.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "Project 2 Database"
LOCATION "dbfs:/FileStore/Project2/sakila_dlh";

##### Creating a New Table That Sources Date Dimension Data From a Table in an Azure Mysql Database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-stokes.mysql.database.azure.com:3306/sakila_dw",
  dbtable "dim_date",
  user "sstokes001",
  password "Rw459484#"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/Project2/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### Create a New Table That Sources Product Dimension Data (Films) From an Azure Mysql Database

In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW view_film
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-stokes.mysql.database.azure.com:3306/sakila_dw",
  dbtable "dim_films",
  user "sstokes001",
  password "Rw459484#"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_films
COMMENT "Films Dimension Table"
LOCATION "dbfs:/FileStore/Project2/sakila_dlh/dim_films"
AS SELECT * FROM view_film

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_films;

col_name,data_type,comment
film_key,bigint,
title,string,
description,string,
release_year,string,
language_id,bigint,
original_language_id,bigint,
rental_duration,bigint,
rental_rate,double,
length,bigint,
replacement_cost,double,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_films LIMIT 5

film_key,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,0,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,0,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,0,6,2.99,130,22.99,G,Deleted Scenes


#### Getting Reference Data from a MongoDB Atlas Database
First, ensuring that the data files are in the proper place.

In [0]:
display(dbutils.fs.ls(data_dir))

path,name,size,modificationTime
dbfs:/FileStore/Project2/data/rentals/,rentals/,0,1683669147000
dbfs:/FileStore/Project2/data/sakila_ad.json,sakila_ad.json,79578,1683657852000
dbfs:/FileStore/Project2/data/sakila_customer3.csv,sakila_customer3.csv,38654,1683672244000
dbfs:/FileStore/Project2/data/sakila_sto.json,sakila_sto.json,111,1683657871000


##### Creating a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/Project2/data/batch'
json_files = {"film_stores" : 'sakila_sto.json', "addresses" : 'sakila_ad.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[50]: <pymongo.results.InsertManyResult at 0x7f82157573c0>

##### Getting Store Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_film_stores = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw").option("collection", "film_stores").load()
.select("store_key","manager_staff_id","address_id")

display(df_film_stores)

store_key,manager_staff_id,address_id
1,1,1
2,2,2


In [0]:
%scala
df_film_stores.printSchema()


##### Using the Spark DataFrame to Create a New Store Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_film_stores.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film_stores")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film_stores

col_name,data_type,comment
store_key,int,
manager_staff_id,int,
address_id,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_film_stores,
Type,MANAGED,
Location,dbfs:/FileStore/Project2/sakila_dlh/dim_film_stores,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film_stores LIMIT 5

store_key,manager_staff_id,address_id
1,1,1
2,2,2


The same steps will be repeated again, only this time to create a dim_address table to be added to sakila_dlh.

In [0]:
%scala
import com.mongodb.spark._

val df_addresses = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw").option("collection", "addresses").load()
.select("address_key","address","address2","district","city_id","postal_code")

display(df_addresses)

address_key,address,address2,district,city_id,postal_code
1,47 MySakila Drive,,Alberta,300,
2,28 MySQL Boulevard,,QLD,576,
3,23 Workhaven Lane,,Alberta,300,
4,1411 Lillydale Drive,,QLD,576,
5,1913 Hanoi Way,,Nagasaki,463,35200.0
6,1121 Loja Avenue,,California,449,17886.0
7,692 Joliet Street,,Attika,38,83579.0
8,1566 Inegl Manor,,Mandalay,349,53561.0
9,53 Idfu Parkway,,Nantou,361,42399.0
10,1795 Santiago de Compostela Way,,Texas,295,18743.0


In [0]:
%scala
df_addresses.printSchema()

In [0]:
%scala
df_addresses.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_addresses")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_addresses

col_name,data_type,comment
address_key,int,
address,string,
address2,string,
district,string,
city_id,int,
postal_code,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_addresses LIMIT 5

address_key,address,address2,district,city_id,postal_code
1,47 MySakila Drive,,Alberta,300,
2,28 MySQL Boulevard,,QLD,576,
3,23 Workhaven Lane,,Alberta,300,
4,1411 Lillydale Drive,,QLD,576,
5,1913 Hanoi Way,,Nagasaki,463,35200.0


#### Getting Data from a File System
PySpark will be used to read from a CSV File from the file path.

In [0]:
customers_csv = "dbfs:/FileStore/Project2/data/sakila_customer3.csv"

df_customers = spark.read.format('csv').options(header='true', inferSchema='true').load(customers_csv)
display(df_customers)

customer_key,store_id,first_name,last_name,email,address_id,active
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1
6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1
7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,1
8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,1
9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,1
10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,1


In [0]:
df_customers.printSchema()

root
 |-- customer_key: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- active: integer (nullable = true)



In [0]:
df_customers.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customers")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customers;

col_name,data_type,comment
customer_key,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
address_id,int,
active,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customers LIMIT 5;

customer_key,store_id,first_name,last_name,email,address_id,active
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1


##### Verify Dimension Tables in the sakila_dlh metadata database

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_addresses,False
sakila_dlh,dim_customers,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film_stores,False
sakila_dlh,dim_films,False
,display_query_1,True
,display_query_2,True
,rentals_bronze_tempview,True
,rentals_raw_tempview,True
,rentals_silver_tempview,True


#### Using AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
First creating a bronze table to process the 'raw' JSON data and creating a temporary view.

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "rental_key BIGINT")
 .option("cloudFiles.schemaHints", "rental_id BIGINT")
 .option("cloudFiles.schemaHints", "inventory_id BIGINT")
 .option("cloudFiles.schemaHints", "customer_id BIGINT") 
 .option("cloudFiles.schemaHints", "payment_id BIGINT")
 .option("cloudFiles.schemaHints", "film_id BIGINT")
 .option("cloudFiles.schemaHints", "store_id BIGINT")
 .option("cloudFiles.schemaHints", "amount DECIMAL")
 .option("cloudFiles.schemaHints", "payment_date_key DECIMAL") 
 .option("cloudFiles.schemaHints", "rental_date_key DECIMAL")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rental_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

amount,customer_id,film_id,inventory_id,payment_date_key,payment_id,rental_date_key,rental_id,rental_key,store_id,_rescued_data,receipt_time,source_file
2.99,130,80,367,0.0,3504,0,1,1,1,,2023-05-09T22:53:03.081+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
3.99,327,80,367,0.0,8828,0,1577,2,1,,2023-05-09T22:53:03.081+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,207,80,367,0.0,5603,0,3584,3,1,,2023-05-09T22:53:03.081+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,45,80,367,0.0,1244,0,10507,4,1,,2023-05-09T22:53:03.081+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,281,80,367,0.0,7623,0,13641,5,1,,2023-05-09T22:53:03.081+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,459,333,1525,0.0,12377,0,2,6,2,,2023-05-09T22:53:03.081+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,471,333,1525,0.0,12724,0,1449,7,2,,2023-05-09T22:53:03.081+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,127,333,1525,0.0,3421,0,5499,8,2,,2023-05-09T22:53:03.081+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,231,333,1525,0.0,6233,0,9711,9,2,,2023-05-09T22:53:03.081+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,567,333,1525,0.0,15205,0,13031,10,2,,2023-05-09T22:53:03.081+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

Out[59]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f8214819400>

##### Creating a Silver Table

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

amount,customer_id,film_id,inventory_id,payment_date_key,payment_id,rental_date_key,rental_id,rental_key,store_id,_rescued_data,receipt_time,source_file
2.99,130,80,367,0.0,3504,0,1,1,1,,2023-05-09T22:53:15.933+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
3.99,327,80,367,0.0,8828,0,1577,2,1,,2023-05-09T22:53:15.933+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,207,80,367,0.0,5603,0,3584,3,1,,2023-05-09T22:53:15.933+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,45,80,367,0.0,1244,0,10507,4,1,,2023-05-09T22:53:15.933+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,281,80,367,0.0,7623,0,13641,5,1,,2023-05-09T22:53:15.933+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,459,333,1525,0.0,12377,0,2,6,2,,2023-05-09T22:53:15.933+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,471,333,1525,0.0,12724,0,1449,7,2,,2023-05-09T22:53:15.933+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,127,333,1525,0.0,3421,0,5499,8,2,,2023-05-09T22:53:15.933+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,231,333,1525,0.0,6233,0,9711,9,2,,2023-05-09T22:53:15.933+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json
2.99,567,333,1525,0.0,15205,0,13031,10,2,,2023-05-09T22:53:15.933+0000,dbfs:/FileStore/Project2/data/rentals/fact_rental.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
amount,double,
customer_id,bigint,
film_id,bigint,
inventory_id,bigint,
payment_date_key,double,
payment_id,bigint,
rental_date_key,"decimal(10,0)",
rental_id,bigint,
rental_key,bigint,
store_id,bigint,


##### Selecting Columns the Tempview and Joining Them to Other Tables

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT r.rental_key,
      r.rental_id,
      r.inventory_id,
      r.customer_id,
      r.payment_id,
      r.film_id,
      r.store_id,
      r.amount,
      r.rental_date_key,
      c.first_name,
      c.last_name,
      od.date_key
  FROM rentals_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_film_stores AS s
  ON s.store_key = r.store_id
  INNER JOIN sakila_dlh.dim_customers AS c 
  ON c.customer_key = r.customer_id
  INNER JOIN sakila_dlh.dim_films AS f
  ON f.film_key = r.film_id
  LEFT OUTER JOIN sakila_dlh.dim_date AS od
  ON od.date_key = r.rental_date_key
)

In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

Out[66]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f8214803df0>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

rental_key,rental_id,inventory_id,customer_id,payment_id,film_id,store_id,amount,rental_date_key,first_name,last_name,date_key


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

col_name,data_type,comment
rental_key,bigint,
rental_id,bigint,
inventory_id,bigint,
customer_id,bigint,
payment_id,bigint,
film_id,bigint,
store_id,bigint,
amount,double,
rental_date_key,"decimal(10,0)",
first_name,string,


##### Creating a Gold Table and Adding Visualizations Based on the Largest Amount Each Customer Spent on Rentals (Only Top 10)

In [0]:

%sql
CREATE OR REPLACE TEMPORARY VIEW customer_film_count AS
  SELECT customer_id, count(amount) AS customer_count
  FROM fact_rentals_silver_tempview
  GROUP BY customer_id
  ORDER BY customer_count DESC;

In [0]:
customers_count_checkpoint_path = f"{database_dir}/customers_counts"

query = (spark.table("customer_film_count")
              .writeStream
              .format("delta")
              .option("checkpointLocation", customers_count_checkpoint_path)
              .outputMode("complete")
              .table("gold_customer_film_count"))

In [0]:
%sql
SELECT * FROM gold_customer_film_count LIMIT 10;

customer_id,customer_count
541,24
558,28
418,30
191,20
65,22
270,25
222,21
293,31
29,36
26,34


Databricks visualization. Run in Databricks to view.