### Setting up environment and variables

##### 1.1 Import required libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

##### 1.2 Instantiate global variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "erv2bd-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "kpunsalan",
  "password" : "Sienameow!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Cluster0KP.vcr5b"
atlas_database_name = "sakila_dw"
atlas_user_name = "kpunsalan"
atlas_password = "GoHoos123"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/final_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/rentals"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rentals_stream_dir = f"{stream_dir}/rentals"

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

##### 1.3 Define global functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
##################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure MYSQL Database'''
    jdbcUrl = f"jdbc:mysql://{host_name}:{port}/{db_name}"
    
    '''Invoke the spark.read.jdbc() function to query tghe database, and fill a Pandas DataFrame'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)

    return dframe

##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Capstone Database"
LOCATION "dbfs:/FileStore/final_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://erv2bd-mysql.mysql.database.azure.com:3306/sakila_dw", --Replace with your Server Name
  dbtable "dim_date",
  user "kpunsalan",    --Replace with your User Name
  password "Sienameow!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20050524,2005-05-24,2005/05/24,05/24/2005,24/05/2005,3,Tuesday,24,144,Weekday,21,May,5,N,2,2005,2005-05,2005Q2,11,4,2005,2005-11,2005Q4
20050525,2005-05-25,2005/05/25,05/25/2005,25/05/2005,4,Wednesday,25,145,Weekday,21,May,5,N,2,2005,2005-05,2005Q2,11,4,2005,2005-11,2005Q4
20050526,2005-05-26,2005/05/26,05/26/2005,26/05/2005,5,Thursday,26,146,Weekday,21,May,5,N,2,2005,2005-05,2005Q2,11,4,2005,2005-11,2005Q4
20050527,2005-05-27,2005/05/27,05/27/2005,27/05/2005,6,Friday,27,147,Weekday,21,May,5,N,2,2005,2005-05,2005Q2,11,4,2005,2005-11,2005Q4
20050528,2005-05-28,2005/05/28,05/28/2005,28/05/2005,7,Saturday,28,148,Weekend,21,May,5,N,2,2005,2005-05,2005Q2,11,4,2005,2005-11,2005Q4


##### 1.3. Create a New Table that Sources Film Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_film" that extracts data from sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_film
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://erv2bd-mysql.mysql.database.azure.com:3306/sakila_dw",
  dbtable "dim_film",
  user "kpunsalan", 
  password "Sienameow!"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_film" using data from the view named "view_film"
CREATE OR REPLACE TABLE sakila_dlh.dim_film
COMMENT "Films Dimension Table"
LOCATION "dbfs:/FileStore/final_data/sakila_dlh/dim_film"
AS SELECT * FROM view_film

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film;

col_name,data_type,comment
film_key,bigint,
film_id,bigint,
film_title,varchar(65535),
rental_rate,double,
rating,varchar(65535),
film_category_key,bigint,
category_id,bigint,
category_key,bigint,
category_name,varchar(65535),
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5

film_key,film_id,film_title,rental_rate,rating,film_category_key,category_id,category_key,category_name
1,1,ACADEMY DINOSAUR,0.99,PG,1,6,6,Documentary
2,2,ACE GOLDFINGER,4.99,G,2,11,11,Horror
3,3,ADAPTATION HOLES,2.99,NC-17,3,6,6,Documentary
4,4,AFFAIR PREJUDICE,2.99,G,4,11,11,Horror
5,5,AFRICAN EGG,2.99,G,5,8,8,Family


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data/retail/batch'

path,name,size,modificationTime
dbfs:/FileStore/final_data/rentals/batch/dim_customer.csv,dim_customer.csv,15335,1733436269000
dbfs:/FileStore/final_data/rentals/batch/dim_customer.json,dim_customer.json,51819,1733436254000
dbfs:/FileStore/final_data/rentals/batch/dim_film.csv,dim_film.csv,54290,1733436479000
dbfs:/FileStore/final_data/rentals/batch/dim_film.json,dim_film.json,188167,1733436240000
dbfs:/FileStore/final_data/rentals/batch/dim_film_category.csv,dim_film_category.csv,22189,1733436424000
dbfs:/FileStore/final_data/rentals/batch/dim_film_category.json,dim_film_category.json,105113,1733436247000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/final_data/rentals/batch'
json_files = {"categories" : 'dim_film_category.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7fc06012c800>

##### 2.3.1. Fetch Category Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "kpunsalan"
val pwd = "GoHoos123"
val clusterName = "Cluster0KP.vcr5b"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_category = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw")
.option("collection", "categories").load()

.select("film_category_key","film_id","category_id","category_key","category_name")

display(df_category)

film_category_key,film_id,category_id,category_key,category_name
1,1,6,6,Documentary
2,2,11,11,Horror
3,3,6,6,Documentary
4,4,11,11,Horror
5,5,8,8,Family
6,6,9,9,Foreign
7,7,5,5,Comedy
8,8,11,11,Horror
9,9,11,11,Horror
10,10,15,15,Sports


In [0]:
%scala
df_category.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Category Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_category.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_category")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_category

col_name,data_type,comment
film_category_key,int,
film_id,int,
category_id,int,
category_key,int,
category_name,string,
,,
# Delta Statistics Columns,,
Column Names,"film_id, category_key, category_name, category_id, film_category_key",
Column Selection Method,first-32,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_category LIMIT 5

film_category_key,film_id,category_id,category_key,category_name
1,1,6,6,Documentary
2,2,11,11,Horror
3,3,6,6,Documentary
4,4,11,11,Horror
5,5,8,8,Family


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
customer_csv = f"{batch_dir}/dim_customer.csv"

df_customer = spark.read.format('csv').options(header='true', inferSchema='true').load(customer_csv)
display(df_customer)

customer_key,customer_id,first_name,last_name
1,1,MARY,SMITH
2,2,PATRICIA,JOHNSON
3,3,LINDA,WILLIAMS
4,4,BARBARA,JONES
5,5,ELIZABETH,BROWN
6,6,JENNIFER,DAVIS
7,7,MARIA,MILLER
8,8,SUSAN,WILSON
9,9,MARGARET,MOORE
10,10,DOROTHY,TAYLOR


In [0]:
df_customer.printSchema()

root
 |-- customer_key: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)



In [0]:
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_key,int,
customer_id,int,
first_name,string,
last_name,string,
,,
# Delta Statistics Columns,,
Column Names,"customer_key, customer_id, first_name, last_name",
Column Selection Method,first-32,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5;

customer_key,customer_id,first_name,last_name
1,1,MARY,SMITH
2,2,PATRICIA,JOHNSON
3,3,LINDA,WILLIAMS
4,4,BARBARA,JONES
5,5,ELIZABETH,BROWN


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_category,False
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
,_sqldf,True
,view_date,True
,view_film,True


### Section III: Integrate Reference Data with Real-Time Data
#### Use AutoLoader to Process Streaming (Hot Path) Rentals Fact Data 
##### Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

amount,customer_key,fact_rental_key,film_key,inventory_id,payment_date_key,payment_id,rental_date_key,rental_id,return_date_key,_rescued_data,receipt_time,source_file
3.99,538,10697,863,3958,20050801,14498,20050801,10701,20050809.0,,2024-12-07T20:47:43.986Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
4.99,560,10698,615,2802,20050801,15009,20050801,10702,20050809.0,,2024-12-07T20:47:43.986Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
2.99,181,10699,395,1818,20050801,4933,20050801,10703,20050807.0,,2024-12-07T20:47:43.986Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
8.99,594,10700,214,960,20050801,15923,20050801,10704,20050808.0,,2024-12-07T20:47:43.986Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
0.99,381,10701,945,4338,20050801,10330,20050801,10705,20050804.0,,2024-12-07T20:47:43.986Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
7.99,147,10702,263,1183,20050801,3997,20050801,10706,20050810.0,,2024-12-07T20:47:43.986Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
0.99,558,10703,258,1165,20050801,14955,20050801,10707,20050806.0,,2024-12-07T20:47:43.986Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
7.99,567,10704,867,3978,20050801,15202,20050801,10708,20050809.0,,2024-12-07T20:47:43.986Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
0.99,418,10705,63,282,20050801,11309,20050801,10709,20050806.0,,2024-12-07T20:47:43.986Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
0.99,177,10706,677,3082,20050801,4814,20050801,10710,20050803.0,,2024-12-07T20:47:43.986Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fc3c4c2fe10>

##### Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

amount,customer_key,fact_rental_key,film_key,inventory_id,payment_date_key,payment_id,rental_date_key,rental_id,return_date_key,_rescued_data,receipt_time,source_file
3.99,538,10697,863,3958,20050801,14498,20050801,10701,20050809.0,,2024-12-07T20:48:07.161Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
4.99,560,10698,615,2802,20050801,15009,20050801,10702,20050809.0,,2024-12-07T20:48:07.161Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
2.99,181,10699,395,1818,20050801,4933,20050801,10703,20050807.0,,2024-12-07T20:48:07.161Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
8.99,594,10700,214,960,20050801,15923,20050801,10704,20050808.0,,2024-12-07T20:48:07.161Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
0.99,381,10701,945,4338,20050801,10330,20050801,10705,20050804.0,,2024-12-07T20:48:07.161Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
7.99,147,10702,263,1183,20050801,3997,20050801,10706,20050810.0,,2024-12-07T20:48:07.161Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
0.99,558,10703,258,1165,20050801,14955,20050801,10707,20050806.0,,2024-12-07T20:48:07.161Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
7.99,567,10704,867,3978,20050801,15202,20050801,10708,20050809.0,,2024-12-07T20:48:07.161Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
0.99,418,10705,63,282,20050801,11309,20050801,10709,20050806.0,,2024-12-07T20:48:07.161Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json
0.99,177,10706,677,3082,20050801,4814,20050801,10710,20050803.0,,2024-12-07T20:48:07.161Z,dbfs:/FileStore/final_data/rentals/stream/rentals/fact_rentals3.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
amount,double,
customer_key,bigint,
fact_rental_key,bigint,
film_key,bigint,
inventory_id,bigint,
payment_date_key,bigint,
payment_id,bigint,
rental_date_key,bigint,
rental_id,bigint,
return_date_key,double,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT r.fact_rental_key,
      r.rental_id,
      r.payment_id,
      r.customer_key,
      r.film_key,
      r.inventory_id,
      r.rental_date_key,
      r.return_date_key,
      r.payment_date_key,
      r.amount,

      c.last_name AS customer_last_name,
      c.first_name AS customer_first_name,

      f.film_title,
      f.rating AS film_rating,
      f.category_name AS film_category,

      rd.day_name_of_week AS rental_day_name_of_week,
      rd.day_of_month AS rental_day_of_month,
      rd.weekday_weekend AS rental_weekday_weekend,
      rd.month_name AS rental_month_name,
      rd.calendar_quarter AS rental_quarter,
      rd.calendar_year AS rental_year
  FROM rentals_silver_tempview AS r

  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_key = r.customer_key

  INNER JOIN sakila_dlh.dim_film AS f
  ON f.film_key = r.film_key

  LEFT OUTER JOIN sakila_dlh.dim_date AS rd
  ON rd.date_key = r.rental_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS retd
  ON retd.date_key = r.return_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS pd
  ON pd.date_key = r.payment_date_key
)

In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fc3c4c4f990>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

fact_rental_key,rental_id,payment_id,customer_key,film_key,inventory_id,rental_date_key,return_date_key,payment_date_key,amount,customer_last_name,customer_first_name,film_title,film_rating,film_category,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_quarter,rental_year
10697,10701,14498,538,863,3958,20050801,20050809.0,20050801,3.99,BREAUX,TED,SUN CONFESSIONS,R,Sci-Fi,Monday,1,Weekday,August,3,2005
10698,10702,15009,560,615,2802,20050801,20050809.0,20050801,4.99,ARCHULETA,JORDAN,NASH CHOCOLAT,PG-13,Animation,Monday,1,Weekday,August,3,2005
10699,10703,4933,181,395,1818,20050801,20050807.0,20050801,2.99,BRADLEY,ANA,HANDICAP BOONDOCK,R,Action,Monday,1,Weekday,August,3,2005
10700,10704,15923,594,214,960,20050801,20050808.0,20050801,8.99,HIATT,EDUARDO,DAUGHTER MADIGAN,PG-13,Children,Monday,1,Weekday,August,3,2005
10701,10705,10330,381,945,4338,20050801,20050804.0,20050801,0.99,BOUDREAU,BOBBY,VIRGINIAN PLUTO,R,Documentary,Monday,1,Weekday,August,3,2005
10702,10706,3997,147,263,1183,20050801,20050810.0,20050801,7.99,ROBERTSON,JOANNE,DURHAM PANKY,R,Sports,Monday,1,Weekday,August,3,2005
10703,10707,14955,558,258,1165,20050801,20050806.0,20050801,0.99,EGGLESTON,JIMMIE,DRUMS DYNAMITE,PG,Horror,Monday,1,Weekday,August,3,2005
10704,10708,15202,567,867,3978,20050801,20050809.0,20050801,7.99,MCADAMS,ALFREDO,SUPER WYOMING,PG,Family,Monday,1,Weekday,August,3,2005
10705,10709,11309,418,63,282,20050801,20050806.0,20050801,0.99,EAST,JEFF,BEDAZZLED MARRIED,PG,Family,Monday,1,Weekday,August,3,2005
10706,10710,4814,177,677,3082,20050801,20050803.0,20050801,0.99,DUNCAN,SAMANTHA,PIANIST OUTFIELD,NC-17,New,Monday,1,Weekday,August,3,2005


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

col_name,data_type,comment
fact_rental_key,bigint,
rental_id,bigint,
payment_id,bigint,
customer_key,bigint,
film_key,bigint,
inventory_id,bigint,
rental_date_key,bigint,
return_date_key,double,
payment_date_key,bigint,
amount,double,


##### Gold Table: Perform Aggregations
I wrote a query for a gold table that shows the number of film rentals per customer each month, with the customer id, first and last name of each customer, and the month in which the rental occurred.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_monthly_rentals_by_customer_gold AS (
  SELECT customer_key AS CustomerID
    , customer_last_name AS LastName
    , customer_first_name AS FirstName
    , rental_month_name AS RentalMonth
    , COUNT(film_key) AS FilmCount
  FROM sakila_dlh.fact_rentals_silver
  GROUP BY CustomerID, LastName, FirstName, RentalMonth
  ORDER BY FilmCount DESC);

SELECT * FROM sakila_dlh.fact_monthly_rentals_by_customer_gold;

CustomerID,LastName,FirstName,RentalMonth,FilmCount
148,HUNT,ELEANOR,July,22
102,FORD,CRYSTAL,July,21
236,DEAN,MARCIA,July,20
75,SANDERS,TAMMY,July,20
64,COX,JUDITH,July,19
354,NGO,JUSTIN,July,19
91,BUTLER,LOIS,July,19
526,SEAL,KARL,July,19
30,KING,MELISSA,July,19
366,HUEY,BRANDON,July,19


This second Gold table query shows the number of films rented per genre per month.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_monthly_rentals_by_genre_gold AS (
  SELECT film_category AS Genre
    , rental_month_name AS RentalMonth
    , COUNT(film_key) AS FilmCount
  FROM sakila_dlh.fact_rentals_silver
  GROUP BY Genre, RentalMonth
  ORDER BY FilmCount DESC);

SELECT * FROM sakila_dlh.fact_monthly_rentals_by_genre_gold;

Genre,RentalMonth,FilmCount
Sports,July,497
Animation,July,489
Action,July,464
Drama,July,463
Sci-Fi,July,462
Family,July,460
Sports,August,432
Foreign,July,432
Documentary,July,429
Animation,August,408


#### Clean up the File System

In [0]:
%fs rm -r /FileStore/lab_data/