# Final Project

- The Final Project is quite similar to your Mid-Term Project with regards to its dimensional data:
    - The Date dimension, and one other dimension of your choosing must be extracted from the MySQL adventureworks database.
    - Data from at least one other dimension must be exported from the MySQL adventureworks database as a JSON file, which must then be uploaded to MongoDB to create a new database/collection.  
    - Your final project notebook must then extract the data from that MongoDB database/collection to create a new dimension in your data lakehouse.
    - Data from one at least one other dimension must be exported from the MySQL adventureworks database as a CSV file, which must then be read from your local hard disk to create a new dimension in your data lakehouse.
- Data from your Fact Table must be exported from the MySQL adventureworks database into at least three (3) separate JSON files, which will then be read into a series of PySpark Streaming Dataframes (e.g., Bronze, Silver and Gold) to complete the Lambda architecture of your Data Lakehouse.  This solution should resemble what was demonstrated in Lab 6.

Overall goals: 
- design and populate a dimensional Data Lakehouse that represents a simple business process of your choosing. 
- a dimensional Data Lakehouse provides for the post hoc summarization and historic analysis of business transactions that reflect the interaction between various entities (e.g., patients & doctors, retailers & customers)
- shows how data can be extracted from various source systems (structured, semi-structured, unstructured), transformed (cleansed, integrated), and then loaded into a destination system that’s optimized for post hoc diagnostic analysis.

CheckList:
- A Date dimension to enable the analysis of the business process over various intervals of time (the code for creating this in MySQL and Microsoft SQL Server has already been provided for you).
- At least 3 additional dimension tables (e.g., customers, employees, products)
- At least 1 fact table that models the business process (e.g., sales, reservations, bookings)
- Your solution must populate its dimensions using data originating from multiple sources:
    - A relational database like Azure MySQL, or Azure SQL Server
    - A NoSQL database like MongoDB Atlas, or Azure Cosmos DB
    - Files (e.g., CSV) from a cloud-based file system; like the Databricks File System (DBFS)
- Your solution must integrate datum of differing granularity (i.e., static and near real-time)
- Your solution must include results that demonstrate the business value of your solution. For example, a query (SELECT statement) that summarizes transaction details by customer, product, etc

1. Your solution must demonstrate at least one batch execution (i.e., use sample source data [SQL, NoSQL and file system] to demonstrate loading at least one incremental data load).
2. Your solution must demonstrate accumulating data that originates from a real-time (streaming) data source for a predetermined interval (mini-batch), integrating it with reference data, and then using the product as a source for populating your dimensional Data Lakehouse. (i.e., implement the Databricks bronze, silver, gold architecture).
    a. Use the Spark AutoLoader to demonstrate integrating streaming data (using separate JSON files) for at least 3 intervals. This is most easily accomplished by segmenting the Fact table source data into 3 ranges and exporting them into 3 separate JSON files. 
    b. Illustrate the relationships between the “real-time” fact data and the static reference data. This is accomplished by joining fact and dimension tables at the Silver table phase.
3. Use a Databricks Notebook to execute all data integration, object creation and query execution.
4. Please submit all code, and other artifacts, in a GitHub repository in your account.

In [1]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

C:\Users\Jennifer\spark-3.5.4-bin-hadoop3


In [2]:
# --------------------------------------------------------------------------------
# Specify MySQL Server Connection Information
# --------------------------------------------------------------------------------
mysql_args = {
    "host_name" : "localhost",
    "port" : "3306",
    "db_name" : "adventureworks",
    "conn_props" : {
        "user" : "root",
        "password" : "myPassw0rd!23",
        "driver" : "com.mysql.cj.jdbc.Driver"
    }
}

# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------

mongodb_args = {
    "cluster_location" : "local", # "atlas"
    "user_name" : "phb8pt",
    "password" : "bPl4xJcGTHVBA4WG",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "g5xao",
    "db_name" : "adventureworks",
    "collection" : "",
    "null_column_threshold" : 0.5
}

# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = os.path.join(os.getcwd(), 'data')
data_dir = os.path.join(base_dir, 'adventureworks')
batch_dir = os.path.join(data_dir, 'batch')
stream_dir = os.path.join(data_dir, 'streaming')

sales_orders_stream_dir = os.path.join(stream_dir, 'sales_orders')


# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "adventureworks_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

sales_orders_output_bronze = os.path.join(database_dir, 'sales_orders', 'bronze')
sales_orders_output_silver = os.path.join(database_dir, 'sales_orders', 'silver')
sales_orders_output_gold = os.path.join(database_dir, 'sales_orders', 'gold')


In [3]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped
    
    
def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe
    

def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark Adventureworks Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_mongo_client(**args):
    '''Get MongoDB Client Connection'''
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
        
    else:
        raise Exception("A MongoDB Client could not be created.")

    return client
    
    
# TODO: Rewrite this to leverage PySpark?
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()
    

def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')
    
    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

In [4]:
remove_directory_tree(database_dir)
#print(database_dir)

"Directory 'c:\\Users\\Jennifer\\Documents\\DS2002\\ds2002capstone\\project\\spark-warehouse\\adventureworks_dlh.db' does not exist."

Create new spark session

In [5]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"

jars = []
mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.1.0", "mysql-connector-j-9.1.0.jar")
mssql_spark_jar = os.path.join(os.getcwd(), "sqljdbc_12.8", "enu", "jars", "mssql-jdbc-12.8.1.jre11.jar")

jars.append(mysql_spark_jar)
#jars.append(mssql_spark_jar)

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf = get_spark_conf(**sparkConf_args)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")
spark

Create a new metadata database

In [6]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 Lab 06 Database'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Lab 6.0');
"""
spark.sql(sql_create_db)

DataFrame[]

Preparing to extract dimensions from different data sources

# Extract dimensions from different data sources
1. df_products from local
2. dim_date from MySQL
3. df_customers from MongoDB
4. df_employees from MongoDB
5. df_fact_order from MongoDB

## 1. df_products from local (.csv)

Use PySpark to Read Data from a **CSV file**

In [7]:
product_csv = os.path.join(batch_dir, 'products.csv')
df_dim_products = spark.read.format('csv').options(header='true', inferSchema='true').load(product_csv)
df_dim_products.toPandas().head(2)

Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellEndDate,ModifiedDate
0,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,0.0,...,,0.0,0,,,,0,0,NaT,2004-03-11 10:01:36
1,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,0.0,...,,0.0,0,,,,0,0,NaT,2004-03-11 10:01:36


Make necessary transformations to the new dataframe

In [8]:
# ----------------------------------------------------------------------------------
# Rename the 'id' column to 'product_id' ------------------------------------------
# ----------------------------------------------------------------------------------
df_dim_products = df_dim_products.withColumnRenamed("id", "product_id")

# ----------------------------------------------------------------------------------
# Add Primary Key column using SQL Windowing function: ROW_NUMBER() 
# ----------------------------------------------------------------------------------
df_dim_products.createOrReplaceTempView("products")
sql_products = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY ProductID) AS product_key
    FROM products;
"""
df_dim_products = spark.sql(sql_products)

# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------
ordered_columns = ['product_key', 'ProductID', 'Name', 'ProductNumber', 
                   'MakeFlag', 'FinishedGoodsFlag', 'SafetyStockLevel', 
                   'ReorderPoint', 'StandardCost', 'ListPrice', 
                   'Size', 'SizeUnitMeasureCode', 'Color',
                   'WeightUnitMeasureCode', 'Weight', 
                   'DaysToManufacture', 'ProductLine', 
                   'Class', 'Style', 
                   'ProductSubcategoryID', 'ProductModelID', 
                   'ModifiedDate'
                   ]

df_dim_products = df_dim_products[ordered_columns]
df_dim_products.toPandas().head(2)

Unnamed: 0,product_key,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,Color,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,ModifiedDate
0,1,1,Adjustable Race,AR-5381,0,0,1000,750,0.0,0.0,...,,,0.0,0,,,,0,0,2004-03-11 10:01:36
1,2,2,Bearing Ball,BA-8327,0,0,1000,750,0.0,0.0,...,,,0.0,0,,,,0,0,2004-03-11 10:01:36


In [9]:
df_dim_products.write.saveAsTable(f"{dest_database}.dim_products", mode="overwrite")

In [10]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_products;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_products LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|         product_key|      int|   NULL|
|           ProductID|      int|   NULL|
|                Name|   string|   NULL|
|       ProductNumber|   string|   NULL|
|            MakeFlag|      int|   NULL|
|   FinishedGoodsFlag|      int|   NULL|
|    SafetyStockLevel|      int|   NULL|
|        ReorderPoint|      int|   NULL|
|        StandardCost|   double|   NULL|
|           ListPrice|   double|   NULL|
|                Size|   string|   NULL|
| SizeUnitMeasureCode|   string|   NULL|
|               Color|   string|   NULL|
|WeightUnitMeasure...|   string|   NULL|
|              Weight|   double|   NULL|
|   DaysToManufacture|      int|   NULL|
|         ProductLine|   string|   NULL|
|               Class|   string|   NULL|
|               Style|   string|   NULL|
|ProductSubcategoryID|      int|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,product_key,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,Color,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,ModifiedDate
0,1,1,Adjustable Race,AR-5381,0,0,1000,750,0.0,0.0,...,,,0.0,0,,,,0,0,2004-03-11 10:01:36
1,2,2,Bearing Ball,BA-8327,0,0,1000,750,0.0,0.0,...,,,0.0,0,,,,0,0,2004-03-11 10:01:36


## 2. dim_date from MySQL
I did include dim_date.csv in case it was needed though

In [11]:
sql_dim_date = f"SELECT * FROM {mysql_args['db_name']}.dim_date"
df_dim_date = get_mysql_dataframe(spark, sql_dim_date, **mysql_args)

In [12]:
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

In [13]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|            date_key|      int|   NULL|
|           full_date|     date|   NULL|
|           date_name| char(11)|   NULL|
|        date_name_us| char(11)|   NULL|
|        date_name_eu| char(11)|   NULL|
|         day_of_week|  tinyint|   NULL|
|    day_name_of_week| char(10)|   NULL|
|        day_of_month|  tinyint|   NULL|
|         day_of_year|      int|   NULL|
|     weekday_weekend| char(10)|   NULL|
|        week_of_year|  tinyint|   NULL|
|          month_name| char(10)|   NULL|
|       month_of_year|  tinyint|   NULL|
|is_last_day_of_month|  char(1)|   NULL|
|    calendar_quarter|  tinyint|   NULL|
|       calendar_year|      int|   NULL|
| calendar_year_month| char(10)|   NULL|
|   calendar_year_qtr| char(10)|   NULL|
|fiscal_month_of_year|  tinyint|   NULL|
|      fiscal_quarter|  tinyint|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


## 3. df_customers, df_employees

In [14]:
client = get_mongo_client(**mongodb_args)

json_files = {"customers" : "customers.json",
              "employees" : 'employees.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], batch_dir, json_files) 

### 3a. df_customers

In [15]:
mongodb_args["collection"] = "customers"

df_dim_customers = get_mongodb_dataframe(spark, **mongodb_args)
df_dim_customers.toPandas().head(2)

Unnamed: 0,AccountNumber,CustomerID,CustomerType,ModifiedDate,TerritoryID
0,AW00000001,1,S,2004-10-13 11:15:07,1
1,AW00000002,2,S,2004-10-13 11:15:07,1


In [16]:
# ----------------------------------------------------------------------------------
# Rename the 'id' column to 'customer_id' ------------------------------------------
# ----------------------------------------------------------------------------------
df_dim_customers = df_dim_customers.withColumnRenamed("id", "customer_id")

# ----------------------------------------------------------------------------------
# Add Primary Key column using SQL Windowing function: ROW_NUMBER() 
# ----------------------------------------------------------------------------------
df_dim_customers.createOrReplaceTempView("customers")
sql_invoices = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY CustomerID) AS customer_key
    FROM customers;
"""
df_dim_customers = spark.sql(sql_invoices)

# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------
ordered_columns = ['customer_key', 'CustomerID','CustomerType', 'AccountNumber', 
                   'ModifiedDate', 'TerritoryID']
df_dim_customers = df_dim_customers[ordered_columns]
df_dim_customers.toPandas().head(2)


Unnamed: 0,customer_key,CustomerID,CustomerType,AccountNumber,ModifiedDate,TerritoryID
0,1,1,S,AW00000001,2004-10-13 11:15:07,1
1,2,2,S,AW00000002,2004-10-13 11:15:07,1


In [17]:
df_dim_customers.write.saveAsTable(f"{dest_database}.dim_customers", mode="overwrite")

In [18]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_customers;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_customers LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|        customer_key|                 int|   NULL|
|          CustomerID|                 int|   NULL|
|        CustomerType|              string|   NULL|
|       AccountNumber|              string|   NULL|
|        ModifiedDate|              string|   NULL|
|         TerritoryID|                 int|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|  adventureworks_dlh|       |
|               Table|       dim_customers|       |
|        Created Time|Fri May 09 18:26:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.5.4|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|           

Unnamed: 0,customer_key,CustomerID,CustomerType,AccountNumber,ModifiedDate,TerritoryID
0,1,1,S,AW00000001,2004-10-13 11:15:07,1
1,2,2,S,AW00000002,2004-10-13 11:15:07,1


### 3b. df_employees

In [19]:
mongodb_args["collection"] = "employees"

df_dim_employees = get_mongodb_dataframe(spark, **mongodb_args)
df_dim_employees.toPandas().head(2)

Unnamed: 0,BirthDate,ContactID,CurrentFlag,EmployeeID,Gender,HireDate,LoginID,ManagerID,MaritalStatus,ModifiedDate,NationalIDNumber,SalariedFlag,SickLeaveHours,Title,VacationHours
0,1972-05-15 00:00:00,1209,1,1,M,1996-07-31 00:00:00,adventure-works\guy1,16,M,2004-07-31 00:00:00,14417807,0,30,Production Technician - WC60,21
1,1977-06-03 00:00:00,1030,1,2,M,1997-02-26 00:00:00,adventure-works\kevin0,6,S,2004-07-31 00:00:00,253022876,0,41,Marketing Assistant,42


In [20]:
# ----------------------------------------------------------------------------------
# Rename the 'id' column to 'employee_id' ------------------------------------------
# ----------------------------------------------------------------------------------
df_dim_employees = df_dim_employees.withColumnRenamed("id", "employee_id")

# ----------------------------------------------------------------------------------
# Add Primary Key column using SQL Windowing function: ROW_NUMBER() 
# ----------------------------------------------------------------------------------
df_dim_employees.createOrReplaceTempView("employees")
sql_invoices = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY EmployeeID) AS employee_key
    FROM employees;
"""
df_dim_employees = spark.sql(sql_invoices)

# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------
ordered_columns = ['employee_key', 'EmployeeID', 'BirthDate', 'ContactID', 'CurrentFlag', 
                   'Gender', 'HireDate', 'LoginID', 'ManagerID', 
                   'MaritalStatus', 'NationalIDNumber', 'SalariedFlag', 
                   'SickLeaveHours', 'Title', 'VacationHours', 'ModifiedDate']
df_dim_employees = df_dim_employees[ordered_columns]
df_dim_employees.toPandas().head(2)


Unnamed: 0,employee_key,EmployeeID,BirthDate,ContactID,CurrentFlag,Gender,HireDate,LoginID,ManagerID,MaritalStatus,NationalIDNumber,SalariedFlag,SickLeaveHours,Title,VacationHours,ModifiedDate
0,1,1,1972-05-15 00:00:00,1209,1,M,1996-07-31 00:00:00,adventure-works\guy1,16,M,14417807,0,30,Production Technician - WC60,21,2004-07-31 00:00:00
1,2,2,1977-06-03 00:00:00,1030,1,M,1997-02-26 00:00:00,adventure-works\kevin0,6,S,253022876,0,41,Marketing Assistant,42,2004-07-31 00:00:00


Save as the dim_employees table in the Data Lakehouse

In [21]:
df_dim_employees.write.saveAsTable(f"{dest_database}.dim_employees", mode="overwrite")

Unit Test

In [22]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_employees;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_employees LIMIT 2").toPandas()

+--------------------+------------------+-------+
|            col_name|         data_type|comment|
+--------------------+------------------+-------+
|        employee_key|               int|   NULL|
|          EmployeeID|               int|   NULL|
|           BirthDate|            string|   NULL|
|           ContactID|               int|   NULL|
|         CurrentFlag|            string|   NULL|
|              Gender|            string|   NULL|
|            HireDate|            string|   NULL|
|             LoginID|            string|   NULL|
|           ManagerID|               int|   NULL|
|       MaritalStatus|            string|   NULL|
|    NationalIDNumber|            string|   NULL|
|        SalariedFlag|            string|   NULL|
|      SickLeaveHours|               int|   NULL|
|               Title|            string|   NULL|
|       VacationHours|               int|   NULL|
|        ModifiedDate|            string|   NULL|
|                    |                  |       |


Unnamed: 0,employee_key,EmployeeID,BirthDate,ContactID,CurrentFlag,Gender,HireDate,LoginID,ManagerID,MaritalStatus,NationalIDNumber,SalariedFlag,SickLeaveHours,Title,VacationHours,ModifiedDate
0,1,1,1972-05-15 00:00:00,1209,1,M,1996-07-31 00:00:00,adventure-works\guy1,16,M,14417807,0,30,Production Technician - WC60,21,2004-07-31 00:00:00
1,2,2,1977-06-03 00:00:00,1030,1,M,1997-02-26 00:00:00,adventure-works\kevin0,6,S,253022876,0,41,Marketing Assistant,42,2004-07-31 00:00:00


# Verify Dimension Tables

In [23]:
spark.sql(f"USE {dest_database};")
spark.sql("SHOW TABLES").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,adventureworks_dlh,dim_customers,False
1,adventureworks_dlh,dim_date,False
2,adventureworks_dlh,dim_employees,False
3,adventureworks_dlh,dim_products,False
4,,customers,True
5,,employees,True
6,,products,True


#  Use PySpark Structured Streaming to Process (Hot Path) <span style="color:darkred">Sales Orders</span> Fact Data
8.1. Verify the location of the source data files on the file system

In [24]:
get_file_info(sales_orders_stream_dir)

Unnamed: 0,name,size,modification_time
0,sales_orders_01.json,6481093,2025-05-09 04:07:02.155543089
1,sales_orders_02.json,6431521,2025-05-09 04:07:31.031861067
2,sales_orders_03.json,6431760,2025-05-09 04:09:08.718038797


## Bronze layer

In [25]:
df_sales_orders_bronze = (
    spark.readStream \
    .option("schemaLocation", sales_orders_output_bronze) \
    .option("maxFilesPerTrigger", 1) \
    .option("multiLine", "true") \
    .json(sales_orders_stream_dir)
)

df_sales_orders_bronze.isStreaming

True

In [26]:
sales_orders_checkpoint_bronze = os.path.join(sales_orders_output_bronze, '_checkpoint')

sales_orders_bronze_query = (
    df_sales_orders_bronze
    #  adding Current Timestamp and Input Filename columns for traceability
    .withColumn("current_timestamp", current_timestamp()) \
    .withColumn("input_filename", input_file_name()) \
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("sales_orders_bronze")
    .trigger(availableNow = True) \
    .option("checkpointLocation", sales_orders_checkpoint_bronze) \
    .option("compression", "snappy") \
    .start(sales_orders_output_bronze)

)

In [27]:
print(f"Query ID: {sales_orders_bronze_query.id}")
print(f"Query Name: {sales_orders_bronze_query.name}")
print(f"Query Status: {sales_orders_bronze_query.status}")

Query ID: 6e04d728-5308-4eb8-ac94-d5dc167d1dcb
Query Name: sales_orders_bronze
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [28]:
sales_orders_bronze_query.awaitTermination()

## Silver Layer: Integrate "Cold-path" Data & Make Transformations
- Prepare Role-Playing Dimension Primary and Business Keys
- Integrate "Cold-path" Data & Make Transformations

In [29]:
# ContactID
df_dim_employee_contact = df_dim_employees.select(col("employee_key").alias("employee_key"), col("EmployeeID").alias("employee_contact_id"))

# CustomerID
df_dim_customer_contact = df_dim_customers.select(col("customer_key").alias("customer_key"), col("CustomerID").alias("customer_id"))


df_dim_due_date = df_dim_date.select(col("date_key").alias("due_date_key"),  col("full_date").alias("due_full_date"))

df_dim_order_date = df_dim_date.select(col("date_key").alias("ordered_date_key"),  col("full_date").alias("ordered_full_date"))

df_dim_ship_date = df_dim_date.select(col("date_key").alias("shipped_date_key"),  col("full_date").alias("shipped_full_date"))

df_dim_modified_date = df_dim_date.select(col("date_key").alias("modified_date_key"),  col("full_date").alias("full_modified_date"))


In [30]:
df_sales_orders_bronze.printSchema()

root
 |-- AccountNumber: string (nullable = true)
 |-- BillToAddressID: long (nullable = true)
 |-- ContactID: long (nullable = true)
 |-- CreditCardApprovalCode: string (nullable = true)
 |-- CreditCardID: long (nullable = true)
 |-- CurrencyRateID: long (nullable = true)
 |-- CustomerID: long (nullable = true)
 |-- DueDate: string (nullable = true)
 |-- Freight: double (nullable = true)
 |-- ModifiedDate: string (nullable = true)
 |-- OnlineOrderFlag: string (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- PurchaseOrderNumber: string (nullable = true)
 |-- RevisionNumber: long (nullable = true)
 |-- SalesOrderID: long (nullable = true)
 |-- SalesOrderNumber: string (nullable = true)
 |-- SalesPersonID: long (nullable = true)
 |-- ShipDate: string (nullable = true)
 |-- ShipMethodID: long (nullable = true)
 |-- ShipToAddressID: long (nullable = true)
 |-- Status: long (nullable = true)
 |-- SubTotal: double (nullable = true)
 |-- TaxAmt: double (nullable = true)
 |-- Te

Define Silver Query to Join Streaming with Batch Data

In [31]:
df_sales_orders_silver = spark.readStream.format("parquet").load(sales_orders_output_bronze) \
    .join(df_dim_employee_contact, df_dim_employee_contact.employee_contact_id == col("ContactID").cast(IntegerType()), 'left') \
    .join(df_dim_customer_contact, df_dim_customer_contact.customer_id == col("CustomerID").cast(IntegerType()), 'left') \
    .join(df_dim_due_date, df_dim_due_date.due_full_date.cast(DateType()) == col("DueDate").cast(DateType()), "left") \
    .join(df_dim_order_date, df_dim_order_date.ordered_full_date.cast(DateType()) == col("OrderDate").cast(DateType()), "left") \
    .join(df_dim_ship_date, df_dim_ship_date.shipped_full_date.cast(DateType()) == col("ShipDate").cast(DateType()), "left") \
    .join(df_dim_modified_date, df_dim_modified_date.full_modified_date.cast(DateType()) == col("ModifiedDate").cast(DateType()), "left") \
    .select(col("SalesOrderID").cast(LongType()), \
            df_dim_employee_contact.employee_key.cast(LongType()), \
            df_dim_customer_contact.customer_key.cast(LongType()), \
            df_dim_due_date.due_date_key.cast(LongType()), \
            df_dim_order_date.ordered_date_key.cast(LongType()), \
            df_dim_ship_date.shipped_date_key.cast(LongType()), \
            df_dim_modified_date.modified_date_key.cast(LongType()), \
            col("AccountNumber"), \
            col("BillToAddressID"), \
            col("CreditCardApprovalCode"), \
            col("CreditCardID"), \
            col("CurrencyRateID"), \
            col("Freight"), \
            col("OnlineOrderFlag"), \
            col("PurchaseOrderNumber"), \
            col("RevisionNumber"), \
            col("SalesOrderNumber"), \
            col("SalesPersonID"), \
            col("ShipMethodID"), 
            col("ShipToAddressID"), \
            col("Status"), \
            col("SubTotal"), \
            col("TaxAmt"), \
            col("TerritoryID"), \
            col("TotalDue")
            )

In [32]:
df_sales_orders_silver.isStreaming

True

In [33]:
df_sales_orders_silver.printSchema()

root
 |-- SalesOrderID: long (nullable = true)
 |-- employee_key: long (nullable = true)
 |-- customer_key: long (nullable = true)
 |-- due_date_key: long (nullable = true)
 |-- ordered_date_key: long (nullable = true)
 |-- shipped_date_key: long (nullable = true)
 |-- modified_date_key: long (nullable = true)
 |-- AccountNumber: string (nullable = true)
 |-- BillToAddressID: long (nullable = true)
 |-- CreditCardApprovalCode: string (nullable = true)
 |-- CreditCardID: long (nullable = true)
 |-- CurrencyRateID: long (nullable = true)
 |-- Freight: double (nullable = true)
 |-- OnlineOrderFlag: string (nullable = true)
 |-- PurchaseOrderNumber: string (nullable = true)
 |-- RevisionNumber: long (nullable = true)
 |-- SalesOrderNumber: string (nullable = true)
 |-- SalesPersonID: long (nullable = true)
 |-- ShipMethodID: long (nullable = true)
 |-- ShipToAddressID: long (nullable = true)
 |-- Status: long (nullable = true)
 |-- SubTotal: double (nullable = true)
 |-- TaxAmt: double (nu

Write the Transformed Streaming data to the Data Lakehouse

In [34]:
sales_orders_checkpoint_silver = os.path.join(sales_orders_output_silver, '_checkpoint')

sales_orders_silver_query = (
    df_sales_orders_silver.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("sales_orders_silver")
    .trigger(availableNow = True) \
    .option("checkpointLocation", sales_orders_checkpoint_silver) \
    .option("compression", "snappy") \
    .start(sales_orders_output_silver)
    # writeStream, in 'parquet' format, to 'purchase_orders_output_silver' in 'append' mode
)

In [35]:
print(f"Query ID: {sales_orders_silver_query.id}")
print(f"Query Name: {sales_orders_silver_query.name}")
print(f"Query Status: {sales_orders_silver_query.status}")

Query ID: 4b8052e4-b7d5-41f5-bd5b-3a5051d9e57b
Query Name: sales_orders_silver
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [36]:
sales_orders_silver_query.awaitTermination()

In [37]:
df_dim_order_date.toPandas().head()

Unnamed: 0,ordered_date_key,ordered_full_date
0,20000101,2000-01-01
1,20000102,2000-01-02
2,20000103,2000-01-03
3,20000104,2000-01-04
4,20000105,2000-01-05


Gold Layer: Perform Aggregations

Business Report Possibilities/Thoughts:
- Identify top-performing salespeople and which regions they are excelling in.
- Helps reward high performers, optimize salesforce allocation, and mentor underperformers.
- Evaluate geographic market strength based on TerritoryID.
- Number_of_Orders helps distinguish between High-volume, low-value territories/employees (many small orders) and  Low-volume, high-value ones (few but large deals)

In [38]:
df_orders_by_product_category_gold = spark.readStream.format("parquet").load(sales_orders_output_silver) \
.join(df_dim_employee_contact, "employee_key") \
.join(df_dim_customer_contact, "customer_key") \
.join(df_dim_due_date, "due_date_key") \
.join(df_dim_order_date, "ordered_date_key") \
.join(df_dim_ship_date, "shipped_date_key") \
.join(df_dim_modified_date, "modified_date_key") \
.groupBy("employee_key", "TerritoryID") \
.agg(count("SalesOrderID").alias("Number_of_Orders"),
    sum("TotalDue").alias("Total_Revenue"),
    avg("TotalDue").alias("Average_Order_Value")) \
.orderBy(desc("Total_Revenue"))

In [39]:
df_orders_by_product_category_gold.printSchema()

root
 |-- employee_key: long (nullable = true)
 |-- TerritoryID: long (nullable = true)
 |-- Number_of_Orders: long (nullable = false)
 |-- Total_Revenue: double (nullable = true)
 |-- Average_Order_Value: double (nullable = true)



Writing the streaming data to a parquet file in "complete" mode

In [40]:
sales_orders_gold_query = (
    df_orders_by_product_category_gold.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("fact_sales_orders_by_employee_territory")
    .start()
    # create the new "fact_inventory_trans_by_product" query
)

In [41]:
wait_until_stream_is_ready(sales_orders_gold_query, 1)

The stream has processed 1 batchs


Querying the Gold Data from Memory

In [42]:
df_fact_sales_orders_by_employee_territory = spark.sql("SELECT * FROM fact_sales_orders_by_employee_territory")
df_fact_sales_orders_by_employee_territory.printSchema()

root
 |-- employee_key: long (nullable = true)
 |-- TerritoryID: long (nullable = true)
 |-- Number_of_Orders: long (nullable = false)
 |-- Total_Revenue: double (nullable = true)
 |-- Average_Order_Value: double (nullable = true)



Creating the Final Selection

In [43]:
df_fact_sales_orders_by_employee_territory_gold_final = df_fact_sales_orders_by_employee_territory \
.select(col("employee_key").alias("Employee"), \
        col("TerritoryID").alias("Location"), \
        col("Total_Revenue").alias("Total Revenue"), \
        col("Number_of_Orders").alias("Total Number of Orders"), \
        col("Average_Order_Value").alias("Average Order Value")) \
.orderBy(desc("Total Revenue"), desc("Average Order Value"), desc("Total Number of Orders"))

Loading the final results into a new table and displaying the results (SQL)

In [44]:
df_fact_sales_orders_by_employee_territory_gold_final.write.saveAsTable(f"{dest_database}.fact_sales_orders_by_employee_territory", mode="overwrite")
spark.sql(f"SELECT * FROM {dest_database}.fact_sales_orders_by_employee_territory").toPandas()

Unnamed: 0,Employee,Location,Total Revenue,Total Number of Orders,Average Order Value
0,267,1,206180.1474,4,51545.036850
1,272,8,205237.8183,4,51309.454575
2,185,1,204090.5716,4,51022.642900
3,116,1,203746.6920,4,50936.673000
4,167,9,194536.3520,4,48634.088000
...,...,...,...,...,...
254,96,7,390941.8673,4,97735.466825
255,201,2,388552.7446,8,48569.093075
256,245,6,386482.9440,12,32206.912000
257,33,6,386084.6761,8,48260.584513


In [45]:
spark.stop()