In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ds2002-zmm8xd.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila2"

connection_properties = {
  "user" : "JonathanM",
  "password" : "Ronaldo03",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "ds2002.kz1qzp7"
atlas_database_name = "sakila2"
atlas_user_name = "zmm8xd"
atlas_password = "Ronaldo03"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/final_project_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rentals_stream_dir = f"{stream_dir}/orders"

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe


##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
--Create the database sakila_dlh
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS2002 Final Project"
LOCATION "dbfs:/FileStore/final_project_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS2002 Final");

In [0]:
%sql
--Connect to mysql and create the temporary table "view_date" from dim_date
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-zmm8xd.mysql.database.azure.com:3306/sakila2?useSSL=true&requireSSL=true", 
  dbtable "dim_date",
  user "JonathanM",    
  password "Ronaldo03" 
)

In [0]:
%sql
--Create dim_date in sakila_dlh
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final_project_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5   

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [0]:
%sql
-- Connect to mysql and create "view_film" using dim_customer
CREATE OR REPLACE TEMPORARY VIEW view_film
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-zmm8xd.mysql.database.azure.com:3306/sakila2?useSSL=true&requireSSL=true", 
  dbtable "dim_customer",
  user "JonathanM",    
  password "Ronaldo03"  
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create dim_customer in sakila_dlh from view_film
CREATE OR REPLACE TABLE sakila_dlh.dim_customer
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/final_project_data/sakila_dlh/dim_customer"
AS SELECT * FROM view_film

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_key,bigint,
store_key,bigint,
first_name,varchar(65535),
last_name,varchar(65535),
email,varchar(65535),
address_key,bigint,
active,bigint,
,,
# Delta Statistics Columns,,
Column Names,"first_name, email, last_name, store_key, customer_key, address_key, active",


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,store_key,first_name,last_name,email,address_key,active
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1


In [0]:
display(dbutils.fs.ls(batch_dir))
# dim_payment is my json file, dim_rental is my csv file

path,name,size,modificationTime
dbfs:/FileStore/final_project_data/retail/batch/sakila2_dim_payment.json,sakila2_dim_payment.json,154950,1701988673000
dbfs:/FileStore/final_project_data/retail/batch/sakila2_dim_rental.csv,sakila2_dim_rental.csv,47929,1701988682000


In [0]:
#Set up pymongo connection
from pymongo import MongoClient

client = MongoClient("mongodb+srv://zmm8xd:Ronaldo03@ds2002.kz1qzp7.mongodb.net/test?ssl=true")

db = client[atlas_database_name]

In [0]:
source_dir = '/dbfs/FileStore/final_project_data/retail/batch'
json_files = {
    "payment": 'sakila2_dim_payment.json'
}
set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files)

<pymongo.results.InsertManyResult at 0x7ff8004c1780>

In [0]:
%scala
import com.mongodb.spark._

val userName = "zmm8xd"
val pwd = "Ronaldo03"
val clusterName = "ds2002.kz1qzp7"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

//create df_payment
val df_payment = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila2")
.option("collection", "payment").load()
.select("payment_key", "customer_key", "rental_key", "staff_key", "payment_date", "amount")

display(df_payment)

payment_key,customer_key,rental_key,staff_key,payment_date,amount
1,1,76,1,2005-05-25 11:30:37,2.99
2,1,573,1,2005-05-28 10:35:23,0.99
3,1,1185,1,2005-06-15 00:54:12,5.99
4,1,1422,2,2005-06-15 18:02:53,0.99
5,1,1476,2,2005-06-15 21:08:46,9.99
6,1,1725,1,2005-06-16 15:18:57,4.99
7,1,2308,1,2005-06-18 08:41:48,4.99
8,1,2363,2,2005-06-18 13:33:59,0.99
9,1,3284,1,2005-06-21 06:24:45,3.99
10,1,4526,2,2005-07-08 03:17:05,5.99


In [0]:
%scala
df_payment.printSchema()

In [0]:
%scala
//Creat dim_payment in sakila_dlh from df_payment
df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_payment")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_payment

col_name,data_type,comment
payment_key,int,
customer_key,int,
rental_key,int,
staff_key,int,
payment_date,string,
amount,double,
,,
# Delta Statistics Columns,,
Column Names,"rental_key, payment_date, amount, staff_key, payment_key, customer_key",
Column Selection Method,first-32,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_payment LIMIT 5

payment_key,customer_key,rental_key,staff_key,payment_date,amount
1,1,76,1,2005-05-25 11:30:37,2.99
2,1,573,1,2005-05-28 10:35:23,0.99
3,1,1185,1,2005-06-15 00:54:12,5.99
4,1,1422,2,2005-06-15 18:02:53,0.99
5,1,1476,2,2005-06-15 21:08:46,9.99


In [0]:
#Create df_rental from the csv file dim_rental
customer_csv = f"{batch_dir}/sakila2_dim_rental.csv"

df_rental = spark.read.format('csv').options(header='true', inferSchema='true').load(customer_csv)
display(df_rental)

rental_key,rental_date,return_date
1,2005-05-24T22:53:30Z,2005-05-26T22:04:30Z
2,2005-05-24T22:54:33Z,2005-05-28T19:40:33Z
3,2005-05-24T23:03:39Z,2005-06-01T22:12:39Z
4,2005-05-24T23:04:41Z,2005-06-03T01:43:41Z
5,2005-05-24T23:05:21Z,2005-06-02T04:33:21Z
6,2005-05-24T23:08:07Z,2005-05-27T01:32:07Z
7,2005-05-24T23:11:53Z,2005-05-29T20:34:53Z
8,2005-05-24T23:31:46Z,2005-05-27T23:33:46Z
9,2005-05-25T00:00:40Z,2005-05-28T00:22:40Z
10,2005-05-25T00:02:21Z,2005-05-31T22:44:21Z


In [0]:
df_rental.printSchema()

root
 |-- rental_key: integer (nullable = true)
 |-- rental_date: timestamp (nullable = true)
 |-- return_date: timestamp (nullable = true)



In [0]:
#Create dim_rental in sakila_dlh from df_rental
from pyspark.sql.functions import col
df_rental = df_rental.withColumn("rental_key", col("rental_key").cast("long"))

df_rental.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_rental")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_rental;

col_name,data_type,comment
rental_key,bigint,
rental_date,timestamp,
return_date,timestamp,
,,
# Delta Statistics Columns,,
Column Names,"rental_key, rental_date, return_date",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_rental LIMIT 5;

rental_key,rental_date,return_date
1,2005-05-24T22:53:30Z,2005-05-26T22:04:30Z
2,2005-05-24T22:54:33Z,2005-05-28T19:40:33Z
3,2005-05-24T23:03:39Z,2005-06-01T22:12:39Z
4,2005-05-24T23:04:41Z,2005-06-03T01:43:41Z
5,2005-05-24T23:05:21Z,2005-06-02T04:33:21Z


In [0]:
%sql
--Check sakila_dlh to make sure all desired tables are in it
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_payment,False
sakila_dlh,dim_rental,False
,display_query_1,True
,display_query_10,True
,display_query_11,True
,display_query_12,True
,display_query_13,True
,display_query_14,True


In [0]:
#Load dim_fact_orders 1,2, and 3 and their columns into the bronze table
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_orders_key BIGINT")
 .option("cloudFiles.schemaHints", "customer_key BIGINT")
 .option("cloudFiles.schemaHints", "store_key BIGINT")
 .option("cloudFiles.schemaHints", "address_key BIGINT")
 .option("cloudFiles.schemaHints", "payment_key BIGINT")
 .option("cloudFiles.schemaHints", "staff_key BIGINT")
 .option("cloudFiles.schemaHints", "rental_key BIGINT")
 .option("cloudFiles.schemaHints", "payment_date_key DATE")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

In [0]:
%sql
/* Adding Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

In [0]:
#Use bronze_tempview to make fact_rentals_bronze
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7ff81589f820>

In [0]:
#Create tempview for silver table from bronze table
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

address_key,customer_key,fact_orders_key,payment_date_key,payment_key,rental_date_key,rental_key,return_date_key,staff_key,store_key,_rescued_data,receipt_time,source_file
5,1,1,,1,20050525,76,20050603,1,1,"{""payment_date_key"":20050525,""_file_path"":""dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json""}",2023-12-08T05:36:10.001Z,dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json
5,1,2,,2,20050528,573,20050603,1,1,"{""payment_date_key"":20050528,""_file_path"":""dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json""}",2023-12-08T05:36:10.001Z,dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json
6,2,3,,33,20050527,320,20050528,1,1,"{""payment_date_key"":20050527,""_file_path"":""dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json""}",2023-12-08T05:36:10.001Z,dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json
7,3,4,,60,20050527,435,20050602,1,1,"{""payment_date_key"":20050527,""_file_path"":""dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json""}",2023-12-08T05:36:10.001Z,dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json
7,3,5,,61,20050529,830,20050601,1,1,"{""payment_date_key"":20050529,""_file_path"":""dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json""}",2023-12-08T05:36:10.001Z,dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json
9,5,6,,108,20050529,731,20050530,1,1,"{""payment_date_key"":20050529,""_file_path"":""dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json""}",2023-12-08T05:36:10.001Z,dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json
10,6,7,,146,20050525,57,20050529,2,2,"{""payment_date_key"":20050525,""_file_path"":""dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json""}",2023-12-08T05:36:10.001Z,dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json
10,6,8,,147,20050528,577,20050601,1,2,"{""payment_date_key"":20050528,""_file_path"":""dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json""}",2023-12-08T05:36:10.001Z,dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json
10,6,9,,148,20050530,916,20050531,2,2,"{""payment_date_key"":20050530,""_file_path"":""dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json""}",2023-12-08T05:36:10.001Z,dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json
11,7,10,,174,20050525,46,20050602,2,1,"{""payment_date_key"":20050525,""_file_path"":""dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json""}",2023-12-08T05:36:10.001Z,dbfs:/FileStore/final_project_data/retail/stream/orders/sakila2_dim_fact_orders1.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
address_key,bigint,
customer_key,bigint,
fact_orders_key,bigint,
payment_date_key,date,
payment_key,bigint,
rental_date_key,bigint,
rental_key,bigint,
return_date_key,bigint,
staff_key,bigint,
store_key,bigint,


In [0]:
%sql
--Use tempview and dim_tables to get all desired columns into silver_tempview
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT rs.fact_orders_key
    , rs.rental_key
    , rs.customer_key
    , rs.store_key
    , rs.payment_key
    , rs.staff_key
    , rs.rental_date_key
    , rs.payment_date_key
    , p.amount AS amount_paid
    , c.first_name
    , c.last_name
    , c.email
    , c.active

FROM rentals_silver_tempview AS rs
INNER JOIN sakila_dlh.dim_payment AS p
ON p.payment_key = rs.payment_key
INNER JOIN sakila_dlh.dim_customer AS c
ON c.customer_key = rs.customer_key
INNER JOIN sakila_dlh.dim_rental AS r
ON r.rental_key = rs.rental_key
)

In [0]:
#Create fact_rentals_silver from tempview

(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7ff81589c6a0>

In [0]:
%sql
SELECT * FROM fact_rentals_silver
LIMIT 5

fact_orders_key,rental_key,customer_key,store_key,payment_key,staff_key,rental_date_key,payment_date_key,amount_paid,first_name,last_name,email,active
1,76,1,1,1,1,20050525,,2.99,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1
2,573,1,1,2,1,20050528,,0.99,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1
3,320,2,1,33,1,20050527,,4.99,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1
4,435,3,1,60,1,20050527,,1.99,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,1
5,830,3,1,61,1,20050529,,2.99,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,1


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

col_name,data_type,comment
fact_orders_key,bigint,
rental_key,bigint,
customer_key,bigint,
store_key,bigint,
payment_key,bigint,
staff_key,bigint,
rental_date_key,bigint,
payment_date_key,date,
amount_paid,double,
first_name,varchar(65535),


In [0]:
%sql
/*Create gold table, with only the information of the address key of the customer, where the purchase was made, and how much was spent.
This information could be useful for several reasons, such as finding out which stores have the largest spenders or where the most
frequent customers live */
CREATE OR REPLACE TABLE sakila_dlh.fact_gold_rentals AS (
  SELECT
    c.address_key,
    fr.store_key,
    p.amount
  FROM
    sakila_dlh.fact_rentals_silver fr
    JOIN sakila_dlh.dim_customer AS c ON fr.customer_key = c.customer_key
    JOIN sakila_dlh.dim_payment AS p ON fr.payment_key = p.payment_key
  WHERE
    fr.rental_date_key IS NOT NULL
);

SELECT * FROM sakila_dlh.fact_gold_rentals;

address_key,store_key,amount
5,1,0.99
5,1,2.99
6,1,4.99
7,1,2.99
7,1,1.99
9,1,0.99
10,2,0.99
10,2,2.99
10,2,4.99
11,1,4.99
