In [0]:
## This is my final project for DS 2002 

In [0]:
###Import required libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

In [0]:
###Instantiate global variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "mrd2wdz-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw2"

connection_properties = {
  "user" : "mrd2wdz",
  "password" : "Peachesrule1",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "ds2002-md.b8ktadu"
atlas_database_name = "sakila_dw2"
atlas_user_name = "mrd2wdz"
atlas_password = "Peachesrule1"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/ds2002-lab06"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/sourcedata"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rental_stream_dir = f"{stream_dir}/rental"

rental_output_bronze = f"{database_dir}/fact_rental/bronze"
rental_output_silver = f"{database_dir}/fact_rental/silver"
rental_output_gold   = f"{database_dir}/fact_rental/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rental", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[5]: True

In [0]:
###Define global functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

In [0]:
## Populate Dimensions by Ingesting Reference (Cold-path) Data
## Fetch Reference Data From an Azure MySQL Database
## Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Lab 06 Database"
LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Lab 6.0");

In [0]:
###Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://mrd2wdz-mysql.mysql.database.azure.com:3306/sakila_dw2",
  dbtable "dim_date",
  user "mrd2wdz",
  password "Peachesrule1"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,bigint,
full_date,date,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_date,
Type,EXTERNAL,
Comment,Date Dimension Table,
Location,dbfs:/FileStore/ds2002-lab06/sakila_dlh/dim_date,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date
20050101,2005-01-01
20050102,2005-01-02
20050103,2005-01-03
20050104,2005-01-04
20050105,2005-01-05


In [0]:
###Create a New Table that Sources Customer Dimension Data from an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://mrd2wdz-mysql.mysql.database.azure.com:3306/sakila_dw2",
  dbtable "dim_customer",
  user "mrd2wdz",
  password "Peachesrule1"
)

In [0]:
%sql
USE DATABASE sakila_dlh;
CREATE OR REPLACE TABLE sakila_dlh.dim_customer
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh/dim_customer"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_key,bigint,
store_id,bigint,
first_name,string,
last_name,string,
email,string,
address_id,bigint,
active,bigint,
create_date,string,
last_update,string,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [0]:
##Fetch Reference Data from a MongoDB Atlas Database
##View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-lab06/sourcedata/batch/sakila_diminventory.csv,sakila_diminventory.csv,12432,1683041816000
dbfs:/FileStore/ds2002-lab06/sourcedata/batch/sakila_staffdim.json,sakila_staffdim.json,468,1683041816000


In [0]:
##Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/ds2002-lab06/sourcedata/batch'
json_files = {"staff" : 'sakila_staffdim.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[24]: <pymongo.results.InsertManyResult at 0x7f65c884c6c0>

In [0]:
##Fetch Staff Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_staff = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw2").option("collection", "staff").load()
.select("staff_key","first_name","last_name","address_id","email","store_id","active","username","password","last_update")

display(df_staff)

staff_key,first_name,last_name,address_id,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


In [0]:
%scala
df_staff.printSchema()

In [0]:
##Use the Spark DataFrame to Create a New Staff Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff

col_name,data_type,comment
staff_key,int,
first_name,string,
last_name,string,
address_id,int,
email,string,
store_id,int,
active,int,
username,string,
password,string,
last_update,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff

staff_key,first_name,last_name,address_id,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


In [0]:
##Fetch Data from a File System
##Use PySpark to Read From a CSV File

In [0]:
inventory_csv = f"{batch_dir}/sakila_diminventory.csv"

df_inventory = spark.read.format('csv').options(header='true', inferSchema='true').load(inventory_csv)
display(df_inventory)

inventory_key,rental_rate,film_id
1,0.99,1
2,0.99,1
3,0.99,1
4,0.99,1
5,0.99,1
6,0.99,1
7,0.99,1
8,0.99,1
9,4.99,2
10,4.99,2


In [0]:
df_inventory.printSchema()

root
 |-- inventory_key: integer (nullable = true)
 |-- rental_rate: double (nullable = true)
 |-- film_id: integer (nullable = true)



In [0]:
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory;

col_name,data_type,comment
inventory_key,int,
rental_rate,double,
film_id,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_inventory,
Type,MANAGED,
Location,dbfs:/FileStore/ds2002-lab06/sakila_dlh/dim_inventory,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5;

inventory_key,rental_rate,film_id
1,0.99,1
2,0.99,1
3,0.99,1
4,0.99,1
5,0.99,1


In [0]:
# verify dimension tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_staff,False
,view_customer,True
,view_date,True


In [0]:
## Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data
## Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_rental_key BIGINT")
 .option("cloudFiles.schemaHints", "inventory_key BIGINT")
 .option("cloudFiles.schemaHints", "customer_key BIGINT")
 .option("cloudFiles.schemaHints", "staff_key BIGINT") 
 .option("cloudFiles.schemaHints", "amount DECIMAL")
 .option("cloudFiles.schemaHints", "last_update STRING")
 .option("cloudFiles.schemaHints", "rental_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "return_date_key DECIMAL")
 .option("cloudFiles.schemaLocation", rental_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rental_stream_dir)
 .createOrReplaceTempView("rental_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rental_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rental_raw_tempview
)

In [0]:
%sql
SELECT * FROM rental_bronze_tempview LIMIT 5

amount,customer_key,fact_rental_key,inventory_key,last_update,rental_date_key,return_date_key,staff_key,_rescued_data,receipt_time,source_file
2.99,130,1,367,2006-02-15 21:30:53,20050524,20050526,1,,2023-05-02T15:39:23.585+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json
2.99,459,2,1525,2006-02-15 21:30:53,20050524,20050528,1,,2023-05-02T15:39:23.585+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json
3.99,408,3,1711,2006-02-15 21:30:53,20050524,20050601,1,,2023-05-02T15:39:23.585+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json
4.99,333,4,2452,2006-02-15 21:30:53,20050524,20050603,2,,2023-05-02T15:39:23.585+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json
6.99,222,5,2079,2006-02-15 21:30:53,20050524,20050602,1,,2023-05-02T15:39:23.585+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json
0.99,549,6,2792,2006-02-15 21:30:53,20050524,20050527,1,,2023-05-02T15:39:23.585+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json
1.99,269,7,3995,2006-02-15 21:30:53,20050524,20050529,2,,2023-05-02T15:39:23.585+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json
4.99,239,8,2346,2006-02-15 21:30:53,20050524,20050527,2,,2023-05-02T15:39:23.585+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json
4.99,126,9,2580,2006-02-15 21:30:53,20050525,20050528,1,,2023-05-02T15:39:23.585+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json
5.99,399,10,1824,2006-02-15 21:30:53,20050525,20050531,2,,2023-05-02T15:39:23.585+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json


In [0]:
(spark.table("rental_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_bronze"))

Out[42]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f65c8872550>

In [0]:
# silver table: include reference data

In [0]:
(spark.readStream
  .table("fact_rental_bronze")
  .createOrReplaceTempView("rental_silver_tempview"))

In [0]:
%sql
SELECT * FROM rental_silver_tempview LIMIT 5

amount,customer_key,fact_rental_key,inventory_key,last_update,rental_date_key,return_date_key,staff_key,_rescued_data,receipt_time,source_file
2.99,130,1,367,2006-02-15 21:30:53,20050524,20050526,1,,2023-05-02T15:40:41.873+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json
2.99,459,2,1525,2006-02-15 21:30:53,20050524,20050528,1,,2023-05-02T15:40:41.873+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json
3.99,408,3,1711,2006-02-15 21:30:53,20050524,20050601,1,,2023-05-02T15:40:41.873+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json
4.99,333,4,2452,2006-02-15 21:30:53,20050524,20050603,2,,2023-05-02T15:40:41.873+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json
6.99,222,5,2079,2006-02-15 21:30:53,20050524,20050602,1,,2023-05-02T15:40:41.873+0000,dbfs:/FileStore/ds2002-lab06/sourcedata/stream/rental/sakila_fact_rental.json


In [0]:
%sql
DESCRIBE EXTENDED rental_silver_tempview 

col_name,data_type,comment
amount,double,
customer_key,bigint,
fact_rental_key,bigint,
inventory_key,bigint,
last_update,string,
rental_date_key,bigint,
return_date_key,"decimal(10,0)",
staff_key,bigint,
_rescued_data,string,
receipt_time,timestamp,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rental_silver_tempview AS (
  SELECT r.fact_rental_key,
      r.inventory_key,
      i.rental_rate,
      i.film_id AS film,
      r.customer_key,
      c.store_id AS store,
      c.first_name AS customer_first_name,
      c.last_name AS customer_last_name,
      r.staff_key,
      s.first_name AS staff_first_name,
      s.last_name AS staff_last_name,
      r.amount,
      r.last_update,
      r.rental_date_key,
      r.return_date_key
  FROM rental_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_inventory AS i
  ON i.inventory_key = r.inventory_key
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_key = r.customer_key
  INNER JOIN sakila_dlh.dim_staff AS s
  ON s.staff_key = r.staff_key
)

In [0]:
(spark.table("fact_rental_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_silver"))

Out[52]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f65c1cc21f0>

In [0]:
%sql
SELECT * FROM fact_rental_silver LIMIT 5

fact_rental_key,inventory_key,rental_rate,film,customer_key,store,customer_first_name,customer_last_name,staff_key,staff_first_name,staff_last_name,amount,last_update,rental_date_key,return_date_key
1,367,2.99,80,130,1,CHARLOTTE,HUNTER,1,Mike,Hillyer,2.99,2006-02-15 21:30:53,20050524,20050526
16,389,4.99,86,316,1,STEVEN,CURLEY,2,Jon,Stephens,4.99,2006-02-15 21:30:53,20050525,20050526
17,830,2.99,181,575,2,ISAAC,OGLESBY,1,Mike,Hillyer,2.99,2006-02-15 21:30:53,20050525,20050527
21,146,4.99,31,388,2,CRAIG,MORRELL,2,Jon,Stephens,4.99,2006-02-15 21:30:53,20050525,20050526
22,727,4.99,159,509,1,RAUL,FORTIER,2,Jon,Stephens,4.99,2006-02-15 21:30:53,20050525,20050526


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rental_silver

col_name,data_type,comment
fact_rental_key,bigint,
inventory_key,bigint,
rental_rate,double,
film,int,
customer_key,bigint,
store,bigint,
customer_first_name,string,
customer_last_name,string,
staff_key,bigint,
staff_first_name,string,


In [0]:
# gold table: perform aggregations

In [0]:
%sql
SELECT amount as Total_Amount
  , customer_last_name AS Customer
  , staff_last_name AS Staff_Member
FROM sakila_dlh.fact_rental_silver
GROUP BY Total_Amount, Customer, Staff_Member
ORDER BY Total_Amount DESC


Total_Amount,Customer,Staff_Member
10.99,ROYAL,Hillyer
10.99,GRAVES,Hillyer
9.99,CHISHOLM,Hillyer
9.99,CHAPMAN,Stephens
9.99,LAWSON,Stephens
9.99,JAMES,Stephens
8.99,HIDALGO,Stephens
8.99,MARKHAM,Stephens
8.99,KINDER,Stephens
8.99,SANBORN,Stephens
