In [0]:
#importing required libraries

import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

In [0]:
#instantiating global variables 

# Azure MySQL Server Connection Information ###################
jdbc_hostname = "<tpq4ew-mysql>.mysql.database.azure.com"
jdbc_port = 3306
src_database = "UnderwearData"

connection_properties = {
  "user" : "tpq4ew",
  "password" : "Rock27star",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster_name.cgnlbvh"
atlas_database_name = "UnderwearData"
atlas_user_name = "tpq4ew"
atlas_password = "Passw0rd123!"

# Data Files (JSON) Information ###############################
dst_database = "UnderwearData2"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

inventory_transactions_stream_dir = f"{stream_dir}/inventory_transactions"

inventory_transactions_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
inventory_transactions_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
inventory_transactions_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"


# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

In [0]:
#Define global functions

##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

Populate Dimensions by Ingesting Reference (Cold-path) Data 


In [0]:
#Fetch reference data from Azure mySQL database
#Create a new databricks metadata database
%sql
DROP DATABASE IF EXISTS UnderwearData2 CASCADE;

In [0]:
#create a new databricks metadata database cont. 
%sql
CREATE DATABASE IF NOT EXISTS UnderwearData2
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/lab_data/UnderwearData2"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

In [0]:
# Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://tpq4ew-mysql.mysql.database.azure.com:3306/UnderwearData", --Replace with your Server Name
  dbtable "dim_date",
  user "tpq4ew",    --Replace with your User Name
  password "Rock27star"  --Replace with you password
)

In [0]:
# Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database cont.
%sql
USE DATABASE underweardata2;

CREATE OR REPLACE TABLE underweardata2.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/UnderwearData2/dim_date"
AS SELECT * FROM view_date

In [0]:
%sql
DESCRIBE EXTENDED UnderwearData2.dim_date;

In [0]:
%sql
SELECT * FROM UnderwearData2.dim_date LIMIT 5

In [0]:
# Create a New Table that Sources Product Dimension Data from an Azure MySQL database.
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://tpq4ew-mysql.mysql.database.azure.com:3306/UnderwearData", --Replace with your Server Name
  dbtable "dim_products",
  user "tpq4ew",    --Replace with your User Name
  password "Rock27star"  --Replace with you password
)

In [0]:
# Create a New Table that Sources Product Dimension Data from an Azure MySQL database cont.

%sql
USE DATABASE underweardata2;

CREATE OR REPLACE TABLE underweardata2.dim_product
COMMENT "Products Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/UnderwearData2/dim_product"
AS SELECT * FROM view_product

-- Create a new table named "northwind_dlh.dim_product" using data from the view named "view_product"

In [0]:
%sql
DESCRIBE EXTENDED UnderwearData2.dim_product;

In [0]:
%sql
SELECT * FROM UnderwearData2.dim_product LIMIT 5

In [0]:
# Getting purchase orders data from a file system, using Pyspark to read from the csv file
purchase_orders_csv = f"{batch_dir}/purchase_orders.csv"

df_purchase_orders = spark.read.format('csv').options(header='true', inferSchema='true').load(purchase_orders_csv)
display(df_purchase_orders)

In [0]:
df_purchase_orders.printSchema()

In [0]:
df_purchase_orders.write.format("delta").mode("overwrite").saveAsTable("UnderwearData2.dim_purchase_orders")

In [0]:
%sql
DESCRIBE EXTENDED UnderwearData2.dim_purchase_orders;

In [0]:
%sql
SELECT * FROM UnderwearData2.dim_purchase_orders LIMIT 5;

In [0]:
#using pyspark to read the suppliers dimension table from a csv file 

suppliers_csv = f"{batch_dir}/suppliers.csv"

df_suppliers = spark.read.format('csv').options(header='true', inferSchema='true').load(suppliers_csv)
display(df_suppliers)

In [0]:
df_suppliers.printSchema()

In [0]:
df_suppliers.write.format("delta").mode("overwrite").saveAsTable("UnderwearData2.dim_suppliers")

In [0]:

%sql
DESCRIBE EXTENDED UnderwearData2.dim_suppliers; 

In [0]:
%sql
SELECT * FROM UnderwearData2.dim_suppliers LIMIT 5;

In [0]:

%sql
USE UnderWearData2;
SHOW TABLES

In [0]:
#viewing the data files on the databricks file system 

display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data/retail/batch'

In [0]:
# Create a New MongoDB Database, and Load JSON Data with inventory transaction dimension table Into a New MongoDB Collection

source_dir = '/dbfs/FileStore/lab_data/retail/batch'
json_files = {"dim_inventory_transactions" : 'Underwear_dim_inventory_transactions.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

In [0]:
#Fetching the newly created inventory transaction dimension data from the MongoDB collection
%scala
import com.mongodb.spark._

val userName = "user_name"
val pwd = "password"
val clusterName = "cluster_name.xxxxx"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_inventory_transactions = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "UnderwearData")
.option("collection", "dim_inventory_transactions")
.option("uri", atlas_uri).load()
.select("TransactionID", "ProductID", "PurchaseOrderID", "MissingID","TransactionDate","UnitPurchasePrice","QuantityOrdered","QuantityReceived","QuantityMissing")

display(df_inventory_transactions)

In [0]:
%scala
df_inventory_transactions.printSchema()

In [0]:
# Use the Spark DataFrame to Create a New Purchase Order Dimension Table in the Databricks Metadata Database (UnderwearData2)
%scala
df_inventory_transactions.write.format("delta").mode("overwrite").saveAsTable("UnderwearData2.dim_inventory_transactions")

In [0]:
%sql
DESCRIBE EXTENDED UnderwearData2.dim_inventory_transactions

In [0]:
%sql
SELECT * FROM UnderwearData2.dim_inventory_transactions LIMIT 5

Integrate Reference with Real-Time Data

In [0]:
#Use Autoloader to Process Streaming (Hot path) Orders fact data
#Creating a Bronze table that processes the raw JSON data

(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "transactionID BIGINT")
 #.option("cloudFiles.schemaHints", "productID BIGINT")
 #.option("cloudFiles.schemaHints", "purchaseOrderID BIGINT")
 #.option("cloudFiles.schemaHints", "missingID BIGINT") 
 #.option("cloudFiles.schemaHints", "TransactionDate DATE")
 #.option("cloudFiles.schemaHints", "unitPurchasePrice DECIMAL")
 #.option("cloudFiles.schemaHints", "quantityOrdered BIGINT")
 #.option("cloudFiles.schemaHints", "quantityReceieved BIGINT")
 #.option("cloudFiles.schemaHints", "quantityMissing BIGINT") 
 .option("cloudFiles.schemaLocation", inventory_transaction_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(customers_stream_dir)
 .createOrReplaceTempView("inventory_transactions_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW inventory_transactions_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM inventory_transactions_raw_tempview
)

In [0]:
%sql
SELECT * FROM customers_bronze_tempview

In [0]:
(spark.table("inventory_transactions_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{customers_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_inventory_transactions_bronze"))

In [0]:
# Creating silver table that includes reference data 
(spark.readStream
  .table("fact_inventory_transactions_bronze")
  .createOrReplaceTempView("inventory_transactions_silver_tempview"))

In [0]:
%sql
SELECT * FROM inventory_transactions_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED inventory_transactions_silver_tempview

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_inventory_transactions_silver_tempview AS (
  SELECT it.fact_inventory_transactions_key,
  it.TransactionID,
  it.productID, 
  p.productName AS product_name, 
  p.color AS product_color, 
  p.modelDescription AS product_model_description, 
  p.fabricDescription AS product_fabric_description, 
  p.category AS product_category, 
  p.gender AS product_gender, 
  p.productLine AS product_product_line, 
  p.weight AS product_weight, 
  p.size AS product_size, 
  p.packSize AS product_packSize 
  p.status AS product_status, 
  p.inventoryDate AS product_inventory_date
  it.purchaseOrderID, 
  po.supplierID, 
  s.supplierName AS supplier_name 
  po.employeeID AS purhcaseorder_employeeID 
  po.shippingMethodID, 
  po.orderDate,
  dd.day_name_of_week AS order_day_name_of_week, 
  dd.day_of_month AS order_day_of_month, 
  dd.weekday_weekend AS order_weekday_weekend, 
  dd.monthName AS order_month_name, 
  dd.calendar_quarter AS order_quarter,
  dd.calendar_year AS order_year, 
  it.missingID, 
  it.transactionDate,
  it.UnitpurchasePrice
  it.quantityOrdered,
  it.quantityReceived, 
  it.quantityMissing

  FROM inventory_transactions_silver_tempview AS it
  INNER JOIN UnderwearData2.dim_products AS p 
  ON p.productID = it.productID
  INNER JOIN UnderwearData2.dim_purchase_orders as po 
  ON po.purhcaseOrderID = it.purchaseOrderID 
  INNER JOIN UnderwearData2.dim_suppliers as s 
  ON s.supplierID = it.supplierID  
  LEFT OUTER JOIN UnderwearData2.dim_date as dd 
  ON dd.OrderDate = it.OrderDate 

)

In [0]:
(spark.table("fact_inventory_transactions_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_inventory_transactions_silver"))

In [0]:
%sql
SELECT * FROM fact_inventory_transactions_silver

In [0]:
%sql
DESCRIBE EXTENDED UnderwearData2.fact_inventory_transactions_silver

In [0]:

%sql
/* Creating the Gold table and performing the aggregations */
CREATE OR REPLACE TABLE UnderwearData2.fact_monthly_orders_by_product_gold AS (
  SELECT productID AS product_key 
    , productName as productName
    , order_month_name AS OrderMonth,
    , COUNT(inventory_QuantityOrdered) AS QuantityOrdered
  FROM UnderwearData2.fact_inventory_transactions_silver
  GROUP BY product_key, productName, QuantityOrdered
  ORDER BY QuantityOrdered DESC);

SELECT * FROM UnderwearData2.fact_monthly_orders_by_product_gold;