##Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

##Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ds2002-mysql-mguajardo.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila"

connection_properties = {
  "user" : "mguajardo",
  "password" : "looneytOOns25",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "sandboxmg.xgonohk"
atlas_database_name = "sakila"
atlas_user_name = "vwn6cz"
atlas_password = "QCtFh9wkdTKjlqu0"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

film_stream_dir = f"{stream_dir}/film"

film_output_bronze = f"{database_dir}/fact_film/bronze"
film_output_silver = f"{database_dir}/fact_film/silver"
film_output_gold   = f"{database_dir}/fact_film/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_film", True) 
# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

##Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result 

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE; 

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Project 2 Database"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Project 2");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-mguajardo.mysql.database.azure.com/sakila", 
  dbtable "dim_date",
  user "mguajardo",    
  password "looneytOOns25"  
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Film Dimension Data from an Azure MySQL database.

In [0]:
%sql
--Create a temporary view named "view_film" that extracts data from your MySQL sakila database
CREATE OR REPLACE TEMPORARY VIEW view_film
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-mguajardo.mysql.database.azure.com:3306/sakila", 
  dbtable "dim_film",
  user "mguajardo",    
  password "looneytOOns25"  
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_film" using data from the view named "view_film"
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_film
COMMENT "Film Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/dim_film"
AS SELECT * FROM view_film

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film;

col_name,data_type,comment
film_key,int,
film_id,int,
title,varchar(128),
description,varchar(65535),
release_year,date,
language_id,int,
original_language_id,int,
rental_duration,int,
rental_rate,"decimal(4,2)",
length,int,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5

film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
1001,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006-01-01,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15T05:03:42Z
1002,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006-01-01,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15T05:03:42Z
1003,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006-01-01,1,,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15T05:03:42Z
1004,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006-01-01,1,,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15T05:03:42Z
1005,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006-01-01,1,,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15T05:03:42Z


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System


In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/tables/lab_data/retail/batch'

path,name,size,modificationTime
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimCustomers.json,Northwind_DimCustomers.json,10884,1715308221000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimEmployees.csv,Northwind_DimEmployees.csv,2174,1715308221000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimInvoices.json,Northwind_DimInvoices.json,6580,1715308221000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimShippers.csv,Northwind_DimShippers.csv,266,1715308221000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimSuppliers.json,Northwind_DimSuppliers.json,1552,1715308221000
dbfs:/FileStore/lab_data/retail/batch/sakila_DimActor.json,sakila_DimActor.json,155624,1715308249000
dbfs:/FileStore/lab_data/retail/batch/sakila_DimCategory.csv,sakila_DimCategory.csv,26755,1715308249000
dbfs:/FileStore/lab_data/retail/batch/sakila_DimFilm.json,sakila_DimFilm.json,483531,1715308249000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/lab_data/retail/batch/'
json_files = {"actor" : 'sakila_DimActor.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f190c582c40>

##### 2.3.1. Fetch Actor Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._
val userName = "vwn6cz"
val pwd = "QCtFh9wkdTKjlqu0"
val clusterName = "sandboxmg.xgonohk"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala
val df_actor = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila")
.option("collection", "actor").load()
.select("actor_key","first_name","last_name","actor_id","film_id","actor_date_updated_key")

display(df_actor)


actor_key,first_name,last_name,actor_id,film_id,actor_date_updated_key
1,PENELOPE,GUINESS,1,1,20060215
2,PENELOPE,GUINESS,1,23,20060215
3,PENELOPE,GUINESS,1,25,20060215
4,PENELOPE,GUINESS,1,106,20060215
5,PENELOPE,GUINESS,1,140,20060215
6,PENELOPE,GUINESS,1,166,20060215
7,PENELOPE,GUINESS,1,277,20060215
8,PENELOPE,GUINESS,1,361,20060215
9,PENELOPE,GUINESS,1,438,20060215
10,PENELOPE,GUINESS,1,499,20060215


In [0]:
%scala
df_actor.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Actor Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
import org.apache.spark.sql.functions.col

// Cast 'actor_key, actor_id, film_id, actor_date_updated_key' columns to LongType
val df_actor_casted = df_actor
  .withColumn("actor_key", col("actor_key").cast("bigint"))
  .withColumn("actor_id", col("actor_id").cast("bigint"))
  .withColumn("film_id", col("film_id").cast("bigint"))
  .withColumn("actor_date_updated_key", col("actor_date_updated_key").cast("bigint"))

// Attempt to write again using casted DataFrame
df_actor_casted.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_actor")


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_actor

col_name,data_type,comment
actor_key,bigint,
first_name,string,
last_name,string,
actor_id,bigint,
film_id,bigint,
actor_date_updated_key,bigint,
,,
# Delta Statistics Columns,,
Column Names,"film_id, first_name, actor_date_updated_key, last_name, actor_id, actor_key",
Column Selection Method,first-32,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_actor LIMIT 5

actor_key,first_name,last_name,actor_id,film_id,actor_date_updated_key
1,PENELOPE,GUINESS,1,1,20060215
2,PENELOPE,GUINESS,1,23,20060215
3,PENELOPE,GUINESS,1,25,20060215
4,PENELOPE,GUINESS,1,106,20060215
5,PENELOPE,GUINESS,1,140,20060215


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
category_csv = f"{batch_dir}/sakila_DimCategory.csv"

df_category = spark.read.format('csv').options(header='true', inferSchema='true').load(category_csv)
display(df_category)

category_key,category_id,name,film_id,category_date_updated_key
1,1,Action,19,20060214
2,1,Action,21,20060214
3,1,Action,29,20060214
4,1,Action,38,20060214
5,1,Action,56,20060214
6,1,Action,67,20060214
7,1,Action,97,20060214
8,1,Action,105,20060214
9,1,Action,111,20060214
10,1,Action,115,20060214


In [0]:
df_category.printSchema()

root
 |-- category_key: integer (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- film_id: integer (nullable = true)
 |-- category_date_updated_key: integer (nullable = true)



In [0]:
df_category.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_category")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_category;

col_name,data_type,comment
category_key,int,
category_id,int,
name,string,
film_id,int,
category_date_updated_key,int,
,,
# Delta Statistics Columns,,
Column Names,"film_id, name, category_date_updated_key, category_key, category_id",
Column Selection Method,first-32,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_category LIMIT 5;

category_key,category_id,name,film_id,category_date_updated_key
1,1,Action,19,20060214
2,1,Action,21,20060214
3,1,Action,29,20060214
4,1,Action,38,20060214
5,1,Action,56,20060214


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_actor,False
sakila_dlh,dim_category,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
,view_date,True
,view_film,True


### Section III: Integrate Reference Data with Real-Time Data
#### 4.0. Use AutoLoader to Process Streaming (Hot Path) Film Fact Data 
##### 4.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_film_key BIGINT")
 #.option("cloudFiles.schemaHints", "film_id BIGINT")
 #.option("cloudFiles.schemaHints", "title TEXT")
 #.option("cloudFiles.schemaHints", "release_year BIGINT") 
 #.option("cloudFiles.schemaHints", "rental_duration BIGINT")
 #.option("cloudFiles.schemaHints", "rental_rate DOUBLE")
 #.option("cloudFiles.schemaHints", "length BIGINT")
 #.option("cloudFiles.schemaHints", "replacement_cost DOUBLE")
 #.option("cloudFiles.schemaHints", "rating TEXT") 
 #.option("cloudFiles.schemaHints", "actor_key DOUBLE")
 #.option("cloudFiles.schemaHints", "actor_id DOUBLE")
 #.option("cloudFiles.schemaHints", "first_name TEXT")
 #.option("cloudFiles.schemaHints", "last_name TEXT")
 #.option("cloudFiles.schemaHints", "category_key BIGINT")
 #.option("cloudFiles.schemaHints", "category_id BIGINT")
 #.option("cloudFiles.schemaHints", "name TEXT")
 #.option("cloudFiles.schemaHints", "category_date_updated_key BIGINT")
 #.option("cloudFiles.schemaHints", "actor_date_updated_key DOUBLE"),
 #.option("cloudFiles.schemaHints", "film_date_updated_key BIGINT")
 .option("cloudFiles.schemaLocation", film_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(film_stream_dir)
 .createOrReplaceTempView("film_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW film_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM film_raw_tempview
)

In [0]:
%sql
SELECT * FROM film_bronze_tempview

actor_date_updated_key,actor_id,actor_key,category_date_updated_key,category_id,category_key,fact_film_key,film_date_updated_key,film_id,first_name,last_name,length,name,rating,release_year,rental_duration,rental_rate,replacement_cost,title,_rescued_data,receipt_time,source_file
,,,20060214,14,853,925,20060215,655,,,109,Sci-Fi,NC-17,2006,5,4.99,22.99,PANTHER REDS,,2024-05-10T16:46:33.232Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film03.json
,,,20060214,16,982,926,20060215,656,,,128,Travel,PG,2006,3,0.99,9.99,PAPI NECKLACE,,2024-05-10T16:46:33.232Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film03.json
,,,20060214,5,285,927,20060215,657,,,48,Comedy,PG-13,2006,5,2.99,12.99,PARADISE SABRINA,,2024-05-10T16:46:33.232Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film03.json
,,,20060214,11,670,928,20060215,658,,,121,Horror,PG-13,2006,7,2.99,19.99,PARIS WEEKEND,,2024-05-10T16:46:33.232Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film03.json
,,,20060214,1,43,929,20060215,659,,,109,Action,PG-13,2006,3,4.99,14.99,PARK CITIZEN,,2024-05-10T16:46:33.232Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film03.json
,,,20060214,5,286,930,20060215,660,,,107,Comedy,PG,2006,7,2.99,11.99,PARTY KNOCK,,2024-05-10T16:46:33.232Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film03.json
,,,20060214,9,557,931,20060215,661,,,157,Foreign,PG-13,2006,5,4.99,17.99,PAST SUICIDES,,2024-05-10T16:46:33.232Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film03.json
20060215.0,37.0,955.0,20060214,7,410,932,20060215,662,VAL,BOLGER,118,Drama,PG,2006,3,4.99,9.99,PATHS CONTROL,,2024-05-10T16:46:33.232Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film03.json
20060215.0,20.0,512.0,20060214,4,230,933,20060215,663,LUCILLE,TRACY,99,Classics,NC-17,2006,7,0.99,29.99,PATIENT SISTER,,2024-05-10T16:46:33.232Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film03.json
20060215.0,37.0,956.0,20060214,4,230,934,20060215,663,VAL,BOLGER,99,Classics,NC-17,2006,7,0.99,29.99,PATIENT SISTER,,2024-05-10T16:46:33.232Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film03.json


In [0]:
(spark.table("film_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{film_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_film_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f18ecb643a0>

##### 4.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_film_bronze")
  .createOrReplaceTempView("film_silver_tempview"))

In [0]:
%sql
SELECT * FROM film_silver_tempview

actor_date_updated_key,actor_id,actor_key,category_date_updated_key,category_id,category_key,fact_film_key,film_date_updated_key,film_id,first_name,last_name,length,name,rating,release_year,rental_duration,rental_rate,replacement_cost,title,_rescued_data,receipt_time,source_file
20060215.0,1.0,1.0,20060214,6,306,1,20060215,1,PENELOPE,GUINESS,86,Documentary,PG,2006,6,0.99,20.99,ACADEMY DINOSAUR,,2024-05-10T16:46:41.211Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film01.json
20060215.0,10.0,213.0,20060214,6,306,2,20060215,1,CHRISTIAN,GABLE,86,Documentary,PG,2006,6,0.99,20.99,ACADEMY DINOSAUR,,2024-05-10T16:46:41.211Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film01.json
20060215.0,20.0,490.0,20060214,6,306,3,20060215,1,LUCILLE,TRACY,86,Documentary,PG,2006,6,0.99,20.99,ACADEMY DINOSAUR,,2024-05-10T16:46:41.211Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film01.json
20060215.0,30.0,781.0,20060214,6,306,4,20060215,1,SANDRA,PECK,86,Documentary,PG,2006,6,0.99,20.99,ACADEMY DINOSAUR,,2024-05-10T16:46:41.211Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film01.json
20060215.0,19.0,465.0,20060214,11,639,5,20060215,2,BOB,FAWCETT,48,Horror,G,2006,3,4.99,12.99,ACE GOLDFINGER,,2024-05-10T16:46:41.211Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film01.json
20060215.0,2.0,20.0,20060214,6,307,6,20060215,3,NICK,WAHLBERG,50,Documentary,NC-17,2006,7,2.99,18.99,ADAPTATION HOLES,,2024-05-10T16:46:41.211Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film01.json
20060215.0,19.0,466.0,20060214,6,307,7,20060215,3,BOB,FAWCETT,50,Documentary,NC-17,2006,7,2.99,18.99,ADAPTATION HOLES,,2024-05-10T16:46:41.211Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film01.json
20060215.0,24.0,610.0,20060214,6,307,8,20060215,3,CAMERON,STREEP,50,Documentary,NC-17,2006,7,2.99,18.99,ADAPTATION HOLES,,2024-05-10T16:46:41.211Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film01.json
,,,20060214,11,640,9,20060215,4,,,117,Horror,G,2006,5,2.99,26.99,AFFAIR PREJUDICE,,2024-05-10T16:46:41.211Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film01.json
,,,20060214,8,436,10,20060215,5,,,130,Family,G,2006,6,2.99,22.99,AFRICAN EGG,,2024-05-10T16:46:41.211Z,dbfs:/FileStore/lab_data/retail/stream/film/sakila_fact_film01.json


In [0]:
%sql
DESCRIBE EXTENDED film_silver_tempview

col_name,data_type,comment
actor_date_updated_key,bigint,
actor_id,bigint,
actor_key,bigint,
category_date_updated_key,bigint,
category_id,bigint,
category_key,bigint,
fact_film_key,bigint,
film_date_updated_key,bigint,
film_id,bigint,
first_name,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_film_silver_tempview AS(
  SELECT f.fact_film_key,
        f.film_id,
        f.title,
        f.release_year,
        f.rental_duration,
        f.rental_rate,
        f.length,
        f.replacement_cost,
        f.rating,
        f.actor_key,
        f.actor_id,
        a.first_name AS actor_first_name,
        a.last_name AS actor_last_name,
        f.category_key,
        f.category_id,
        c.name AS category_name,
        f.category_date_updated_key,
        f.actor_date_updated_key,
        f.film_date_updated_key
FROM film_silver_tempview AS f
INNER JOIN sakila_dlh.dim_actor AS a
ON f.actor_key = a.actor_key
INNER JOIN sakila_dlh.dim_category AS c
ON f.category_key = c.category_key
INNER JOIN sakila_dlh.dim_date AS d
ON f.film_date_updated_key = d.date_key
INNER JOIN sakila_dlh.dim_film AS o
ON f.film_id = o.film_id
)

In [0]:
(spark.table("fact_film_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{film_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_film_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f18ecb65ba0>

In [0]:
%sql
SELECT * FROM fact_film_silver

fact_film_key,film_id,title,release_year,rental_duration,rental_rate,length,replacement_cost,rating,actor_key,actor_id,actor_first_name,actor_last_name,category_key,category_id,category_name,category_date_updated_key,actor_date_updated_key,film_date_updated_key
1,1,ACADEMY DINOSAUR,2006,6,0.99,86,20.99,PG,1,1,PENELOPE,GUINESS,306,6,Documentary,20060214,20060215,20060215
2,1,ACADEMY DINOSAUR,2006,6,0.99,86,20.99,PG,213,10,CHRISTIAN,GABLE,306,6,Documentary,20060214,20060215,20060215
3,1,ACADEMY DINOSAUR,2006,6,0.99,86,20.99,PG,490,20,LUCILLE,TRACY,306,6,Documentary,20060214,20060215,20060215
4,1,ACADEMY DINOSAUR,2006,6,0.99,86,20.99,PG,781,30,SANDRA,PECK,306,6,Documentary,20060214,20060215,20060215
5,2,ACE GOLDFINGER,2006,3,4.99,48,12.99,G,465,19,BOB,FAWCETT,639,11,Horror,20060214,20060215,20060215
6,3,ADAPTATION HOLES,2006,7,2.99,50,18.99,NC-17,20,2,NICK,WAHLBERG,307,6,Documentary,20060214,20060215,20060215
7,3,ADAPTATION HOLES,2006,7,2.99,50,18.99,NC-17,466,19,BOB,FAWCETT,307,6,Documentary,20060214,20060215,20060215
8,3,ADAPTATION HOLES,2006,7,2.99,50,18.99,NC-17,610,24,CAMERON,STREEP,307,6,Documentary,20060214,20060215,20060215
11,6,AGENT TRUMAN,2006,3,2.99,169,17.99,PG,520,21,KIRSTEN,PALTROW,505,9,Foreign,20060214,20060215,20060215
12,6,AGENT TRUMAN,2006,3,2.99,169,17.99,PG,573,23,SANDRA,KILMER,505,9,Foreign,20060214,20060215,20060215


In [0]:
%sql
DESCRIBE EXTENDED fact_film_silver

col_name,data_type,comment
fact_film_key,bigint,
film_id,bigint,
title,string,
release_year,bigint,
rental_duration,bigint,
rental_rate,double,
length,bigint,
replacement_cost,double,
rating,string,
actor_key,bigint,


##### 4.3. Gold Table: Perform Aggregations

In [0]:
%sql
-- The total number of movies grouped by actor 
CREATE OR REPLACE TABLE sakila_dlh.fact_film_gold AS (
      SELECT actor_first_name 
        , actor_last_name 
        , COUNT(title) AS Number_of_Movies
    FROM sakila_dlh.fact_film_silver
    GROUP BY actor_first_name
        , actor_last_name
    ORDER BY Number_of_Movies DESC);


num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM sakila_dlh.fact_film_gold;

actor_first_name,actor_last_name,Number_of_Movies
SANDRA,KILMER,37
UMA,WOOD,35
VAL,BOLGER,35
JULIA,MCQUEEN,33
RIP,CRAWFORD,33
HELEN,VOIGHT,32
WOODY,HOFFMAN,31
KARL,BERRY,31
VIVIEN,BERGEN,30
LUCILLE,TRACY,30


#### 5.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/lab_data/