## Jenny Schilling (xdj3kg)
## DS2002 Capstone Project

### Putting it All Together: Data Integration & Analysis
__Deliverable:__ Using Azure Databricks, design and populate a dimensional Data Lakehouse that represents a simple business process of your choosing. Examples might include retail sales, inventory, procurement, order management, transportation or hospitality bookings, medical appointments, student registration and/or attendance. You may select any business process that interests you, but remember that a dimensional Data Lakehouse provides for the post hoc summarization and historic analysis of business transactions that reflect the interaction between various entities (e.g., patients & doctors, retailers &
customers, students & schools/classes, travelers & airlines/hotels).

The most straight-forward approach is to identify an existing OLTP example database wherein all required data relationships already exist; however, you may choose to populate your Data Lakehouse using data from multiple sources as long as you can successfully use their business keys (e.g., customer code, product code) to establish the appropriate relationships between the Fact and Dimension tables. Your project should demonstrate your understanding of the differing types of relational data systems (OLTP/OLAP), and how data can be extracted from various source systems (structured, semi-structured, unstructured), transformed (cleansed, integrated), and then loaded into a destination system that’s optimized for post hoc diagnostic analysis. Your project should also demonstrate your knowledge of data integration patterns like ETL, ELT and ELTL, and architectures (e.g., lambda or kappa) for integrating batch and real-time (streaming) data sources.

###Design Requirements:
Your solution (database schema) needn’t be complex, but should meet the following requirements:
- A Date dimension to enable the analysis of the business process over various intervals of time (the code for creating this in MySQL and Microsoft SQL Server has already been provided for you).
- At least 3 additional dimension tables (e.g., customers, products, products)
- At least 1 fact table that models the business process (e.g., sales, reservations, bookings)
- Your solution must populate its dimensions using data originating from multiple sources:
  - A relational database like Azure MySQL, or Azure SQL Server
  - A NoSQL database like MongoDB Atlas, or Azure Cosmos DB
  - Files (e.g., CSV) from a cloud-based file system; like the Databricks File System (DBFS)
- Your solution must integrate datum of differing granularity (i.e., static and near real-time)
- Your solution must include results that demonstrate the business value of your solution. For example, a query (SELECT statement) that summarizes transaction details by customer, product, etc.

###Functional Requirements:
1. Your solution must demonstrate at least one batch execution (i.e., use sample source data \[SQL, NoSQL and file system] to demonstrate loading at least one incremental data load).
2. Your solution must demonstrate accumulating data that originates from a real-time (streaming) data source for a predetermined interval (mini-batch), integrating it with reference data, and then using the product as a source for populating your dimensional Data Lakehouse. (i.e., implement the Databricks bronze, silver, gold architecture).
  a. Use the Spark AutoLoader to demonstrate integrating streaming data (using separate JSON files) for at least 3 intervals. This is most easily accomplished by segmenting the Fact table source data into 3 ranges and exporting them into 3 separate JSON files.
  b. Illustrate the relationships between the “real-time” fact data and the static reference data. This is accomplished by joining fact and dimension tables at the Silver table phase.
3. Use a Databricks Notebook to execute all data integration, object creation and query execution.
4. Please submit all code, and other artifacts, in a GitHub repository in your account.

Imports

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

Instantiate Global Variables

In [0]:
jdbc_hostname = "xdj3kg-ds2002-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "retail_sales_dw"

connection_properties = {
  "user" : "xdj3kg",
  "password" : "Passw0rd123!",
  "driver" : "org.mariadb.jdbc.Driver"
}

atlas_cluster_name = "cluster0.2ihqfeb"
atlas_database_name = "retail_sales_dw"
atlas_user_name = "xdj3kg"
atlas_password = "Passw0rd123!"

dst_database = "retail_sales_dlh"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

sales_stream_dir = f"{stream_dir}/sales"

sales_output_bronze = f"{database_dir}/sales_fact/bronze"
sales_output_silver = f"{database_dir}/sales_fact/silver"
sales_output_gold   = f"{database_dir}/sales_fact/gold"

dbutils.fs.rm(f"{database_dir}/sales_fact", True) 

dbutils.fs.rm(database_dir, True)

Define global functions

In [0]:
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Populate Dimensions by Ingesting Reference (Cold-path) Data
Create new databricks metadata database

In [0]:
%sql
DROP DATABASE IF EXISTS retail_sales_dlh CASCADE;
CREATE DATABASE IF NOT EXISTS retail_sales_dlh
COMMENT "JS Capstone Project Database"
LOCATION "dbfs:/FileStore/lab_data/retail_sales_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS2002 Capstone Project");

Create new table that sources **date** dimension data from a table in Azure MySQL database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://xdj3kg-ds2002-mysql.mysql.database.azure.com:3306/retail_sales_dw2",
  dbtable "date_dim",
  user "xdj3kg",
  password "Passw0rd123!"
);

USE DATABASE retail_sales_dlh;

CREATE OR REPLACE TABLE retail_sales_dlh.date_dim
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/retail_sales_dlh/date_dim"
AS SELECT * FROM view_date;

DESCRIBE EXTENDED northwind_dlh.date_dim;
SELECT * FROM northwind_dlh.date_dim LIMIT 5;

Create new table that sources **customer** dimension data from a table in Azure MySQL database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://xdj3kg-ds2002-mysql.mysql.database.azure.com:3306/retail_sales_dw2",
  dbtable "customer_dim",
  user "xdj3kg",
  password "Passw0rd123!"
);

USE DATABASE retail_sales_dlh;

CREATE OR REPLACE TABLE retail_sales_dlh.customer_dim
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/retail_sales_dlh/customer_dim"
AS SELECT * FROM view_customer;

DESCRIBE EXTENDED northwind_dlh.customer_dim;
SELECT * FROM northwind_dlh.customer_dim LIMIT 5;

Create new table that sources **product** dimension data from a table in Azure MySQL database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://xdj3kg-ds2002-mysql.mysql.database.azure.com:3306/retail_sales_dw2",
  dbtable "product_dim",
  user "xdj3kg",
  password "Passw0rd123!"
);

USE DATABASE retail_sales_dlh;

CREATE OR REPLACE TABLE retail_sales_dlh.product_dim
COMMENT "Product Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/retail_sales_dlh/product_dim"
AS SELECT * FROM view_product;

DESCRIBE EXTENDED northwind_dlh.product_dim;
SELECT * FROM northwind_dlh.product_dim LIMIT 5;

### Fetch reference data from a NoSQL MongoDB Atlas Database
This section fetches product review data from the MongoDB collection, and loads that data into the Data Lakehouse product dimension by the product key.

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data/retail/batch'

source_dir = '/dbfs/FileStore/lab_data/retail/batch'
json_files = {"reviews" : 'retil_sales_dw.product_reviews.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

In [0]:
%scala
import com.mongodb.spark._
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
val userName = "xdj3kg"
val pwd = "Passw0rd123!"
val clusterName = "cluster0.2ihqfeb"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

val df_reviews = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "retail_sales_dw2")
.option("collection", "product_reviews").load()
.select("id","product_key","reviewer","review_text","rating")

display(df_reviews)
df_reviews.printSchema()
df_reviews.write.format("delta").mode("overwrite").saveAsTable("retail_sales_dlh.product_dim")

In [0]:
%sql
USE DATABASE retail_sales_dlh;
ALTER TABLE product_dim ADD COLUMN reviewer varchar(50);
ALTER TABLE product_dim ADD COLUMN review_text text;
ALTER TABLE product_dim ADD COLUMN rating INT;

In [0]:
%scala
spark_merged_df.write.jdbc(url=url, table="temporary_table_for_updates", mode="overwrite", properties=properties)

In [0]:
%sql
UPDATE product_dim pd
JOIN temporary_table_for_updates tmp ON pd.product_key = tmp.product_key
SET pd.reviewer = tmp.reviewer, pd.review_text = tmp.review_text, pd.rating = tmp.rating;

DESCRIBE EXTENDED retail_sales_dlh.product_dim;

SELECT * FROM retail_sales_dlh.product_dim LIMIT 5;

###Including CSV files from a cloud-based file system: Databricks File System (DBFS)
Adds data regarding whether each customer is a VIP from 'dbfs:/FileStore/lab_data/batch/customer_vip_info.csv' and aligns it with the first and last names in the customer dimension table.

In [0]:
!pip install pymysql

In [0]:
import pandas as pd
import pymysql
customer_vip_data = f"{batch_dir}/customer_vip_info.csv"
customer_vip_data.iloc[:, 2:] = customer_vip_data.iloc[:, 2:].astype(bool)

df_vip = spark.read.format('csv').options(header='true', inferSchema='true').load(customer_vip_data)
display(df_vip)

In [0]:
%sql
USE DATABASE retail_sales_dlh;
SELECT
    c.customer_id,
    c.first_name,
    c.last_name,
    c.phone,
    c.credit_limit,
    v.is_vip,
FROM
    customer_dim AS c
LEFT JOIN
    df_vip AS v
ON
    c.first_name = v.first_name AND c.last_name = v.last_name;

In [0]:
df_vip.write.format("delta").mode("overwrite").saveAsTable("retail_sales_dlh.customer_dim")

In [0]:
%sql
DESCRIBE EXTENDED retail_sales_dlh.customer_dim;
SELECT * FROM retail_sales_dlh.customer_dim LIMIT 5;

###  Integrating datum of differing granularity 
#### Sales Fact Table that models the business process
Using autoloader to process streaming (hot path) sales fact data and implementing the Databricks bronze, silver, and gold architecture.

#####Bronze Table: process 'raw' JSON data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", sales_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(sales_stream_dir)
 .createOrReplaceTempView("sales_raw_tempview"))

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW sales_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM sales_raw_tempview
);

SELECT * FROM sales_bronze_tempview;

In [0]:
(spark.table("sales_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{sales_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("sales_fact_bronze"))

#####Silver Table: include reference data

In [0]:
(spark.readStream
  .table("sales_fact_bronze")
  .createOrReplaceTempView("sales_silver_tempview"))

In [0]:
%sql
SELECT * FROM sales_silver_tempview;
DESCRIBE EXTENDED sales_silver_tempview;

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW sales_fact_silver_tempview AS (
  SELECT s.sale_id,
    s.sale_date,
    d.day_name_of_week AS sale_day_name_of_week,
    d.day_of_month AS sale_day_of_month,
    d.weekday_weekend AS sale_weekday_weekend,
    d.month_name AS sale_month_name,
    d.calendar_quarter AS sale_quarter,
    d.calendar_year AS sale_year,
    s.product_id,
    p.product_name AS sale_product_name,
    p.product_line AS sale_product_line,
    p.quantity_in_stock AS sale_quantity_instock,
    p.product_price AS sale_product_price,
    p.reviewer AS sale_review,
    p.review_text AS sale_review_text,
    p.rating AS sale_rating,
    s.customer_id,
    c.first_name AS sale_customer_firstname,
    c.last_name AS sale_customer_lastname,
    c.phone AS sale_customer_phone,
    c.credit_limit AS sale_customer_credit_limit,
    c.is_vip AS sale_customer_isvip,
    s.quantity_sold,
    s.sales_amount
  FROM sales_silver_tempview AS s
  INNER JOIN retail_sales_dlh.customer_dim AS c
  ON c.customer_id = s.customer_id
  INNER JOIN retail_sales.product_dim AS p
  ON p.product_id = s.product_id
  INNER JOIN retail_sales_dlh.date_dim AS d
  ON d.full_date = s.sale_date
)

In [0]:
(spark.table("sales_fact_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{sales_output_silver}/_checkpoint")
      .outputMode("append")
      .table("sales_fact_silver"))

In [0]:
%sql
SELECT * FROM sales_fact_silver;
DESCRIBE EXTENDED retail_sales_dlh.sales_fact_silver

#####Gold Table: perform aggregations using the CTAS approach
This table includes the number of products sold per customer each month, along with the customer's ID, first & last name, and the month in which the sale was made.

In [0]:
%sql
CREATE OR REPLACE TABLE retail_sales_dlh.fact_monthly_sales_by_customer_gold AS (
  SELECT customer_id AS CustomerID
    , sale_customer_lastname AS LastName
    , sale_customer_firstname AS FirstName
    , sale_month_name AS SaleMonth
    , COUNT(product_id) AS ProductCount
  FROM retail_sales_dlh.sales_fact_silver
  GROUP BY CustomerID, LastName, FirstName, SaleMonth
  ORDER BY ProductCount DESC);

SELECT * FROM retail_sales_dlh.fact_monthly_sales_by_customer_gold;

In [0]:
%sql
CREATE OR REPLACE TABLE retail_sales_dlh.fact_product_sales_by_customer_gold AS (
  SELECT pc.CustomerID
    , ss.sale_customer_lastname AS CustomerName
    , ss.product_id AS ProductCode
    , pc.ProductCount
  FROM retail_sales_dlh.sales_fact_silver AS ss
  INNER JOIN (
    SELECT customer_id AS CustomerID
    , COUNT(product_id) AS ProductCount
    FROM retail_sales_dlh.sales_fact_silver
    GROUP BY customer_id
  ) AS pc
  ON pc.CustomerID = ss.customer_id
  ORDER BY ProductCount DESC);

SELECT * FROM retail_sales_dlh.fact_product_sales_by_customer_gold;

###SQL query to demonstrate the business value
This SQL query calculates the total sales quantity (using SUM), grouping by 'is_vip' and 'product_name' to see sales quantities for each product broken down by VIP status. I sort by total_quantity_sold in descending order to find out the top-selling products.

In [0]:
%sql
USE retail_sales_dw;

SELECT
    c.is_vip,
    p.product_name,
    SUM(sales_fact.quantity_sold) AS total_quantity_sold
FROM sales_fact
JOIN customer_dim AS c ON sales_fact.customer_id = c.customer_id
JOIN product_dim AS p ON sales_fact.product_id = p.product_id
GROUP BY c.is_vip, p.product_name
ORDER BY total_quantity_sold DESC; 

In [0]:
# Clean up the file system
%fs rm -r /FileStore/lab_data/