# Final Project

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "mbz8dg-ds2002.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "tyler_mcf",
  "password" : "thePassword4",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.hkyzauy"
atlas_database_name = "sakila_dw"
atlas_user_name = "mbz8dg"
atlas_password = "thePassword4"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/tables"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"
purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[2]: True

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

###Date Dimension Table (From Azure)

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://mbz8dg-ds2002.mysql.database.azure.com:3306/sakila_dw", 
  dbtable "dim_date",
  user "tylermcf",
  password "thePassword4"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,timestamp,
day_of_week,bigint,
day_name_of_week,string,
day_of_month,bigint,
day_of_year,bigint,
weekday_weekend,string,
week_of_year,bigint,
month_name,string,
month_of_year,bigint,
is_last_day_of_month,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
2000-01-01T00:00:00.000+0000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
2000-01-02T00:00:00.000+0000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
2000-01-03T00:00:00.000+0000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
2000-01-04T00:00:00.000+0000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
2000-01-05T00:00:00.000+0000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


### Inventory Dimension Table (From MySQL)

In [0]:
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_inventory
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://mbz8dg-ds2002.mysql.database.azure.com:3306/sakila_dw", 
  dbtable "dim_inventory",
  user "tylermcf",
  password "thePassword4"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "northwind_dlh.dim_product" using data from the view named "view_product"
CREATE OR REPLACE TABLE sakila_dlh.dim_inventory
COMMENT "Product Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh/dim_inventory"
AS SELECT * FROM view_inventory

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory;

col_name,data_type,comment
inventory_id,bigint,
film_id,int,
store_id,int,
last_update,timestamp,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_inventory,
Type,EXTERNAL,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

inventory_id,film_id,store_id,last_update
1,1,1,2006-02-15T05:09:17.000+0000
2,1,1,2006-02-15T05:09:17.000+0000
3,1,1,2006-02-15T05:09:17.000+0000
4,1,1,2006-02-15T05:09:17.000+0000
5,1,2,2006-02-15T05:09:17.000+0000


### Customer Dimension (from MongoDB)

In [0]:
%scala
import com.mongodb.spark._
val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb+srv://mbz8dg:thePassword4@cluster0.hkyzauy.mongodb.net")
.option("database", "sakila").option("collection", "customer").load()
.select("customer_id","store_id","last_name","first_name","email","address_id","active","create_date","last_update")
display(df_customer)


customer_id,store_id,last_name,first_name,email,address_id,active,create_date,last_update
1,1,SMITH,MARY,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
18,2,GARCIA,CAROL,CAROL.GARCIA@sakilacustomer.org,22,1,2006-02-14 22:04:36,2006-02-15 04:57:20
107,1,WOODS,FLORENCE,FLORENCE.WOODS@sakilacustomer.org,111,1,2006-02-14 22:04:36,2006-02-15 04:57:20
143,1,GORDON,LESLIE,LESLIE.GORDON@sakilacustomer.org,147,1,2006-02-14 22:04:36,2006-02-15 04:57:20
151,2,PALMER,MEGAN,MEGAN.PALMER@sakilacustomer.org,155,1,2006-02-14 22:04:36,2006-02-15 04:57:20
157,2,ROSE,DARLENE,DARLENE.ROSE@sakilacustomer.org,161,1,2006-02-14 22:04:36,2006-02-15 04:57:20
183,2,ANDREWS,IDA,IDA.ANDREWS@sakilacustomer.org,187,1,2006-02-14 22:04:36,2006-02-15 04:57:20
187,2,RILEY,BRITTANY,BRITTANY.RILEY@sakilacustomer.org,191,1,2006-02-14 22:04:36,2006-02-15 04:57:20
188,1,ARMSTRONG,MELANIE,MELANIE.ARMSTRONG@sakilacustomer.org,192,1,2006-02-14 22:04:36,2006-02-15 04:57:20
193,2,ELLIOTT,KATIE,KATIE.ELLIOTT@sakilacustomer.org,197,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [0]:
%scala
df_customer.printSchema()

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_id,int,
store_id,int,
last_name,string,
first_name,string,
email,string,
address_id,int,
active,int,
create_date,string,
last_update,string,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_id,store_id,last_name,first_name,email,address_id,active,create_date,last_update
1,1,SMITH,MARY,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
18,2,GARCIA,CAROL,CAROL.GARCIA@sakilacustomer.org,22,1,2006-02-14 22:04:36,2006-02-15 04:57:20
107,1,WOODS,FLORENCE,FLORENCE.WOODS@sakilacustomer.org,111,1,2006-02-14 22:04:36,2006-02-15 04:57:20
143,1,GORDON,LESLIE,LESLIE.GORDON@sakilacustomer.org,147,1,2006-02-14 22:04:36,2006-02-15 04:57:20
151,2,PALMER,MEGAN,MEGAN.PALMER@sakilacustomer.org,155,1,2006-02-14 22:04:36,2006-02-15 04:57:20


### Staff Dimension (From CSV file system)

In [0]:
staff_csv = f"dbfs:/FileStore/Staff.csv"
df_staff = spark.read.format('csv').options(header='true', inferSchema='true').load(staff_csv)
display(df_staff)

staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2/15/06 3:57
2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2/15/06 3:57


In [0]:
df_staff.printSchema()

root
 |-- staff_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- picture: string (nullable = true)
 |-- email: string (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- username: string (nullable = true)
 |-- password: string (nullable = true)
 |-- last_update: string (nullable = true)



In [0]:
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
staff_id,int,
first_name,string,
last_name,string,
address_id,int,
picture,string,
email,string,
store_id,int,
active,int,
username,string,
password,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5;

staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2/15/06 3:57
2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2/15/06 3:57


### Final Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_staff,False
,view_date,True
,view_inventory,True


### Fact Rental Data using spark.readstream and AutoLoader (Hot Path, JSON files)

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", f"dbfs:/FileStore/tables/sakila_dlh/fact_orders/_schemas")
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(f"dbfs:/FileStore/tables/source_data/stream/rentals")
 .createOrReplaceTempView("rentals_raw_tempview"))

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT * EXCEPT (rental_key), current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

amount,customer_key,fact_rental_key,inventory_key,payment_key,return_key,staff_key,_rescued_data,receipt_time,source_file
6.99,468,101,617,2005-05-25 17:17:04,2005-05-31 19:47:04,1,,2023-05-08T18:17:20.670+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals2.json
3.99,343,102,373,2005-05-25 17:22:10,2005-05-31 19:47:10,1,,2023-05-08T18:17:20.670+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals2.json
4.99,384,103,3343,2005-05-25 17:30:42,2005-06-03 22:36:42,1,,2023-05-08T18:17:20.670+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals2.json
0.99,310,104,4281,2005-05-25 17:46:33,2005-05-27 15:20:33,1,,2023-05-08T18:17:20.670+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals2.json
4.99,108,105,794,2005-05-25 17:54:12,2005-05-30 12:03:12,2,,2023-05-08T18:17:20.670+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals2.json
11.99,196,106,3627,2005-05-25 18:18:19,2005-06-04 00:01:19,2,,2023-05-08T18:17:20.670+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals2.json
6.99,317,107,2833,2005-05-25 18:28:09,2005-06-03 22:46:09,2,,2023-05-08T18:17:20.670+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals2.json
2.99,242,108,3289,2005-05-25 18:30:05,2005-05-30 19:40:05,1,,2023-05-08T18:17:20.670+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals2.json
1.99,503,109,1044,2005-05-25 18:40:20,2005-05-29 20:39:20,2,,2023-05-08T18:17:20.670+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals2.json
9.99,19,110,4108,2005-05-25 18:43:49,2005-06-03 18:13:49,2,,2023-05-08T18:17:20.670+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals2.json


In [0]:
(spark.table("rentals_bronze_tempview")
     .writeStream
     .format("delta")
     .option("checkpointLocation", f"dbfs:/FileStore/tables/sakila_dlh/fact_orders/checkpoint")
     .outputMode("append")
     .table("fact_rentals_bronze"))

Out[27]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f233e165610>

### 8.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

amount,customer_key,fact_rental_key,inventory_key,payment_key,return_key,staff_key,_rescued_data,receipt_time,source_file
2.99,130,1,367,2005-05-24 22:53:30,2005-05-26 22:04:30,1,,2023-05-08T18:17:22.203+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals1.json
2.99,459,2,1525,2005-05-24 22:54:33,2005-05-28 19:40:33,1,,2023-05-08T18:17:22.203+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals1.json
3.99,408,3,1711,2005-05-24 23:03:39,2005-06-01 22:12:39,1,,2023-05-08T18:17:22.203+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals1.json
4.99,333,4,2452,2005-05-24 23:04:41,2005-06-03 01:43:41,2,,2023-05-08T18:17:22.203+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals1.json
6.99,222,5,2079,2005-05-24 23:05:21,2005-06-02 04:33:21,1,,2023-05-08T18:17:22.203+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals1.json
0.99,549,6,2792,2005-05-24 23:08:07,2005-05-27 01:32:07,1,,2023-05-08T18:17:22.203+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals1.json
1.99,269,7,3995,2005-05-24 23:11:53,2005-05-29 20:34:53,2,,2023-05-08T18:17:22.203+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals1.json
4.99,239,8,2346,2005-05-24 23:31:46,2005-05-27 23:33:46,2,,2023-05-08T18:17:22.203+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals1.json
4.99,126,9,2580,2005-05-25 00:00:40,2005-05-28 00:22:40,1,,2023-05-08T18:17:22.203+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals1.json
5.99,399,10,1824,2005-05-25 00:02:21,2005-05-31 22:44:21,2,,2023-05-08T18:17:22.203+0000,dbfs:/FileStore/tables/source_data/stream/rentals/rentals1.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
amount,double,
customer_key,bigint,
fact_rental_key,bigint,
inventory_key,bigint,
payment_key,string,
return_key,string,
staff_key,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql
create or replace temporary view fact_rentals_silver_tempview as (
  select r.fact_rental_key,
    r.customer_key,
    c.last_name as customer_name,
    r.inventory_key,
    i.film_id,
    i.store_id,
    r.staff_key,
    s.last_name as staff_name,
    cast (r.payment_key as DATE),
    d.day_name_of_week
  from rentals_silver_tempview as r
  left outer join sakila_dlh.dim_customer as c
  on r.customer_key = c.customer_id
  left OUTER join sakila_dlh.dim_inventory as i
  on r.inventory_key = i.inventory_id
  inner join sakila_dlh.dim_staff as s
  on r.staff_key = s.staff_id
  left outer join sakila_dlh.dim_date as d
  on r.payment_key = cast (d.date_key as Date)
)

In [0]:
%sql

select * from fact_rentals_silver_tempview

fact_rental_key,customer_key,customer_name,inventory_key,film_id,store_id,staff_key,staff_name,payment_key,day_name_of_week
1,130,HUNTER,367,80,1,1,Hillyer,2005-05-24,Tuesday
2,459,COLLAZO,1525,333,2,1,Hillyer,2005-05-24,Tuesday
3,408,MURRELL,1711,373,2,1,Hillyer,2005-05-24,Tuesday
4,333,PURDY,2452,535,1,2,Stephens,2005-05-24,Tuesday
5,222,HANSEN,2079,450,2,1,Hillyer,2005-05-24,Tuesday
6,549,CHRISTENSON,2792,613,1,1,Hillyer,2005-05-24,Tuesday
7,269,WALTERS,3995,870,2,2,Stephens,2005-05-24,Tuesday
8,239,ROMERO,2346,510,1,2,Stephens,2005-05-24,Tuesday
9,126,SIMPSON,2580,565,1,1,Hillyer,2005-05-25,Wednesday
10,399,ISOM,1824,396,2,2,Stephens,2005-05-25,Wednesday


In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"/dbfs/FileStore/tables/sakila_dlh/fact_orders")
      .outputMode("append")
      .table("fact_rentals_silver2"))

Out[47]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f233d390a90>

In [0]:
%sql
SELECT * FROM fact_rentals_silver2

fact_rental_key,customer_key,customer_name,inventory_key,film_id,store_id,staff_key,staff_name,payment_key,day_name_of_week
1,130,HUNTER,367,80,1,1,Hillyer,2005-05-24,Tuesday
2,459,COLLAZO,1525,333,2,1,Hillyer,2005-05-24,Tuesday
3,408,MURRELL,1711,373,2,1,Hillyer,2005-05-24,Tuesday
4,333,PURDY,2452,535,1,2,Stephens,2005-05-24,Tuesday
5,222,HANSEN,2079,450,2,1,Hillyer,2005-05-24,Tuesday
6,549,CHRISTENSON,2792,613,1,1,Hillyer,2005-05-24,Tuesday
7,269,WALTERS,3995,870,2,2,Stephens,2005-05-24,Tuesday
8,239,ROMERO,2346,510,1,2,Stephens,2005-05-24,Tuesday
9,126,SIMPSON,2580,565,1,1,Hillyer,2005-05-25,Wednesday
10,399,ISOM,1824,396,2,2,Stephens,2005-05-25,Wednesday


In [0]:
%sql
DESCRIBE EXTENDED fact_rentals_silver2

col_name,data_type,comment
fact_rental_key,bigint,
customer_key,bigint,
customer_name,string,
inventory_key,bigint,
film_id,int,
store_id,int,
staff_key,bigint,
staff_name,string,
payment_key,date,
day_name_of_week,string,


##### 8.3. Gold Table: Perform Aggregations

In [0]:
%sql

SELECT day_name_of_week,
  count(staff_key) as interactions,
  staff_name
from sakila_dlh.fact_rentals_silver
GROUP BY staff_name,day_name_of_week
ORDER BY day_name_of_week
  

day_name_of_week,interactions,staff_name
Thursday,70,Hillyer
Thursday,83,Stephens
Tuesday,3,Stephens
Tuesday,5,Hillyer
Wednesday,69,Hillyer
Wednesday,67,Stephens
