FINAL PROJECT, Katherine Anne Rukavina, prh9ss

Imports

In [1]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from pyspark.sql.types import *
from pyspark.sql.window import Window as W

/opt/anaconda3/envs/pysparkenv/lib/python3.12/site-packages/pyspark


Connection Variables- mysql and mongoDB

In [2]:
mysql_args = {
    "host_name" : "localhost",
    "port" : "3306",
    "db_name" : "adventureworks",
    "conn_props" : {
        "user" : "root",
        "password" : "bandit3661",
        "driver" : "com.mysql.cj.jdbc.Driver"
    }
}

# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------
mongodb_args = {
    "cluster_location" : "local",
    "user_name" : "",
    "password" : "",
    "cluster_name" : "sandbox",
    "cluster_subnet" : "zibbf",
    "db_name" : "adventureworks",
    "collection" : "",
    "null_column_threshold" : 0.5
}

Source Data Directory Structure

In [3]:
base_dir = os.path.join(os.getcwd(), 'final_data')
data_dir = os.path.join(base_dir, 'adventureworks')
batch_dir = os.path.join(data_dir, 'batch')
stream_dir = os.path.join(data_dir, 'streaming')
orders_stream_dir = os.path.join(stream_dir, 'sales_orders')

Data Lakehouse Files Directory Structure

In [4]:
dest_database = "adventureworks_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"

database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)
orders_output_bronze = os.path.join(database_dir, 'fact_sales_orders', 'bronze')
orders_output_silver = os.path.join(database_dir, 'fact_sales_orders', 'silver')
orders_output_gold = os.path.join(database_dir, 'fact_sales_orders', 'gold')

Sales Orders Batch Structure- 3 intervals

In [5]:
input = os.path.join(batch_dir, "fact_sales_orders.json") 
os.makedirs(orders_stream_dir, exist_ok=True) #streaming directory

#open file in read mode
with open(input, "r") as f:
    data = json.load(f)

#split into 3 intervals
chunk = len(data) // 3

part1 = data[:chunk]
part2 = data[chunk:2*chunk]
part3 = data[2*chunk:]

#part 1
with open(os.path.join(orders_stream_dir, "sales_orders_01.json"), "w") as f:
    json.dump(part1, f)

#part 2
with open(os.path.join(orders_stream_dir, "sales_orders_02.json"), "w") as f:
    json.dump(part2, f)

#part 3
with open(os.path.join(orders_stream_dir, "sales_orders_03.json"), "w") as f:
    json.dump(part3, f)

Global Setup Functions

In [6]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped
    
    
def get_mysql_dataframe(spark_session, sql_query : str, **args):
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe
    

def get_mongo_uri(**args):
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark Adventureworks Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_mongo_client(**args):
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
        
    else:
        raise Exception("A MongoDB Client could not be created.")

    return client
    
    
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()
    

def get_mongodb_dataframe(spark_session, **args):
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    dframe = dframe.drop('_id')
    
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

Initialize Data Lakehouse

In [7]:
remove_directory_tree(database_dir)

"Directory '/Users/karukavina/Documents/VSCODEFiles/DS/FinalProject/main/spark-warehouse/adventureworks_dlh.db' has been removed successfully."

Create Spark Session

In [8]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"

jars = []
#mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.1.0", "mysql-connector-j-9.1.0.jar")
mysql_spark_jar = "/Users/karukavina/Documents/VSCODEFiles/DS/FinalProject/mysql-connector-j-9.1.0/mysql-connector-j-9.1.0.jar"


mssql_spark_jar = os.path.join(os.getcwd(), "sqljdbc_12.8", "enu", "jars", "mssql-jdbc-12.8.1.jre11.jar")

jars.append(mysql_spark_jar)

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf = get_spark_conf(**sparkConf_args)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")
spark

25/12/19 17:07:26 WARN Utils: Your hostname, KAs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.4.82 instead (on interface en0)
25/12/19 17:07:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/karukavina/.ivy2/cache
The jars for the packages stored in: /Users/karukavina/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-457afb2c-473b-40fc-a9c3-4019eb0556a0;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 79ms :: artifacts dl 4ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-

:: loading settings :: url = jar:file:/opt/anaconda3/envs/pysparkenv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


25/12/19 17:07:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Create Metadata database

In [9]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 AdventureWorks Database'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Adventureworks');
"""
spark.sql(sql_create_db)

DataFrame[]

### Populating Data In Three Parts

Source Data Files Location

In [10]:
get_file_info(batch_dir)

Unnamed: 0,name,size,modification_time
0,dim_customers.csv,134342,2025-12-19 04:10:47.456573248
1,dim_date.csv,138125,2025-12-19 04:10:52.215084791
2,dim_employees.csv,48471,2025-12-19 04:10:55.419627190
3,dim_products.csv,72529,2025-12-19 04:10:57.740822554
4,dim_vendors.csv,12169,2025-12-19 04:11:00.130044221
5,fact_purchase_orders.json,566337,2025-12-19 04:42:21.120588541
6,fact_sales_orders.json,1028552,2025-10-27 00:18:01.239309788


## Part 1: Retrieving Data From CSV

Populating Customer Dimension

In [11]:
#read data from csv file

customer_csv = os.path.join(batch_dir, 'dim_customers.csv')
print(customer_csv)

df_dim_customers = spark.read.format('csv').options(header='true', inferSchema='true').load(customer_csv)
df_dim_customers.toPandas().head(2)

/Users/karukavina/Documents/VSCODEFiles/DS/FinalProject/main/final_data/adventureworks/batch/dim_customers.csv


Unnamed: 0,customer_key,customer_id,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,0,98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,0,98055,US,United States,North America,Northwest


In [12]:
#transformations

ordered_columns = ['customer_key', 'customer_id', 'AccountNumber', 'AddressLine1', 'AddressLine2', 'City',
                   'StateProvinceCode', 'State_Province', 'PostalCode', 'CountryRegionCode', 'Country_Region',
                    'Sales Territory Group', 'Sales Territory']

df_dim_customers = df_dim_customers[ordered_columns]
df_dim_customers.toPandas().head(2)

Unnamed: 0,customer_key,customer_id,AccountNumber,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,2251 Elliot Avenue,,Seattle,WA,Washington,98104,US,United States,North America,Northwest
1,2,2,AW00000002,7943 Walnut Ave,,Renton,WA,Washington,98055,US,United States,North America,Northwest


In [13]:
#save table in data lakehouse

df_dim_customers.write.saveAsTable(f"{dest_database}.dim_customers", mode="overwrite")

In [14]:
#testing- table preview

spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_customers;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_customers LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|        customer_key|                 int|   NULL|
|         customer_id|                 int|   NULL|
|       AccountNumber|              string|   NULL|
|        AddressLine1|              string|   NULL|
|        AddressLine2|              string|   NULL|
|                City|              string|   NULL|
|   StateProvinceCode|              string|   NULL|
|      State_Province|              string|   NULL|
|          PostalCode|              string|   NULL|
|   CountryRegionCode|              string|   NULL|
|      Country_Region|              string|   NULL|
|Sales Territory G...|              string|   NULL|
|     Sales Territory|              string|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|           

Unnamed: 0,customer_key,customer_id,AccountNumber,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,2251 Elliot Avenue,,Seattle,WA,Washington,98104,US,United States,North America,Northwest
1,2,2,AW00000002,7943 Walnut Ave,,Renton,WA,Washington,98055,US,United States,North America,Northwest


Populating Employee Dimension

In [15]:
#read data from csv file

employee_csv = os.path.join(batch_dir, 'dim_employees.csv')
print(employee_csv)

df_dim_employees = spark.read.format('csv').options(header='true', inferSchema='true').load(employee_csv)
df_dim_employees.toPandas().head(2)

/Users/karukavina/Documents/VSCODEFiles/DS/FinalProject/main/final_data/adventureworks/batch/dim_employees.csv


Unnamed: 0,employee_key,employee_id,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,1,14417807,adventure-works\guy1,16,Guy,R,Gilbert,Production Technician - WC60,guy1@adventure-works.com,0,320-555-0195,1972-05-15,M,M,1996-07-31,0,21,30,1
1,2,2,253022876,adventure-works\kevin0,6,Kevin,F,Brown,Marketing Assistant,kevin0@adventure-works.com,2,150-555-0189,1977-06-03,S,M,1997-02-26,0,42,41,1


In [16]:
#transformations

ordered_columns = ['employee_key', 'employee_id', 'FirstName', 'MiddleName', 'LastName', 'NationalIDNumber', 'LoginID',
                   'Title', 'EmailAddress', 'EmailPromotion', 'Phone', 'BirthDate', 'MaritalStatus', 'Gender', 
                   'HireDate', 'SalariedFlag', 'VacationHours', 'SickLeaveHours']

df_dim_employees = df_dim_employees[ordered_columns]
df_dim_employees.toPandas().head(2)

Unnamed: 0,employee_key,employee_id,FirstName,MiddleName,LastName,NationalIDNumber,LoginID,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours
0,1,1,Guy,R,Gilbert,14417807,adventure-works\guy1,Production Technician - WC60,guy1@adventure-works.com,0,320-555-0195,1972-05-15,M,M,1996-07-31,0,21,30
1,2,2,Kevin,F,Brown,253022876,adventure-works\kevin0,Marketing Assistant,kevin0@adventure-works.com,2,150-555-0189,1977-06-03,S,M,1997-02-26,0,42,41


In [17]:
#save table in data lakehouse

df_dim_employees.write.saveAsTable(f"{dest_database}.dim_employees", mode="overwrite")

In [18]:
#testing- table preview

spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_employees;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_employees LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|        employee_key|      int|   NULL|
|         employee_id|      int|   NULL|
|           FirstName|   string|   NULL|
|          MiddleName|   string|   NULL|
|            LastName|   string|   NULL|
|    NationalIDNumber|      int|   NULL|
|             LoginID|   string|   NULL|
|               Title|   string|   NULL|
|        EmailAddress|   string|   NULL|
|      EmailPromotion|      int|   NULL|
|               Phone|   string|   NULL|
|           BirthDate|     date|   NULL|
|       MaritalStatus|   string|   NULL|
|              Gender|   string|   NULL|
|            HireDate|     date|   NULL|
|        SalariedFlag|      int|   NULL|
|       VacationHours|      int|   NULL|
|      SickLeaveHours|      int|   NULL|
|                    |         |       |
|# Detailed Table ...|         |       |
+--------------------+---------+-------+
only showing top

Unnamed: 0,employee_key,employee_id,FirstName,MiddleName,LastName,NationalIDNumber,LoginID,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours
0,1,1,Guy,R,Gilbert,14417807,adventure-works\guy1,Production Technician - WC60,guy1@adventure-works.com,0,320-555-0195,1972-05-15,M,M,1996-07-31,0,21,30
1,2,2,Kevin,F,Brown,253022876,adventure-works\kevin0,Marketing Assistant,kevin0@adventure-works.com,2,150-555-0189,1977-06-03,S,M,1997-02-26,0,42,41


Populating Products Dimension

In [19]:
#read data from csv file

products_csv = os.path.join(batch_dir, 'dim_products.csv')
print(products_csv)

df_dim_products = spark.read.format('csv').options(header='true', inferSchema='true').load(products_csv)
df_dim_products.toPandas().head(20)

/Users/karukavina/Documents/VSCODEFiles/DS/FinalProject/main/final_data/adventureworks/batch/dim_products.csv


Unnamed: 0,product_key,product_id,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,...,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate
0,1,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,...,,,0,,,,,,,1998-06-01
1,2,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,...,,,0,,,,,,,1998-06-01
2,3,3,BB Ball Bearing,BE-2349,1,0,,800,600,0.0,...,,,1,,,,,,,1998-06-01
3,4,4,Headset Ball Bearings,BE-2908,0,0,,800,600,0.0,...,,,0,,,,,,,1998-06-01
4,5,316,Blade,BL-2036,1,0,,800,600,0.0,...,,,1,,,,,,,1998-06-01
5,6,317,LL Crankarm,CA-5965,0,0,Black,500,375,0.0,...,,,0,,L,,,,,1998-06-01
6,7,318,ML Crankarm,CA-6738,0,0,Black,500,375,0.0,...,,,0,,M,,,,,1998-06-01
7,8,319,HL Crankarm,CA-7457,0,0,Black,500,375,0.0,...,,,0,,,,,,,1998-06-01
8,9,320,Chainring Bolts,CB-2903,0,0,Silver,1000,750,0.0,...,,,0,,,,,,,1998-06-01
9,10,321,Chainring Nut,CN-6137,0,0,Silver,1000,750,0.0,...,,,0,,,,,,,1998-06-01


In [20]:
#transformations

ordered_columns = ['product_key', 'product_id', 'Name', 'Color', 'ProductNumber', 'ProductCategory', 'SafetyStockLevel', 'ReorderPoint',
                    'StandardCost', 'ListPrice', 'DaysToManufacture', 'SellStartDate']

df_dim_products = df_dim_products[ordered_columns]
df_dim_products.toPandas().head(2)

Unnamed: 0,product_key,product_id,Name,Color,ProductNumber,ProductCategory,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,DaysToManufacture,SellStartDate
0,1,1,Adjustable Race,,AR-5381,,1000,750,0.0,0.0,0,1998-06-01
1,2,2,Bearing Ball,,BA-8327,,1000,750,0.0,0.0,0,1998-06-01


In [21]:
#save table in data lakehouse

df_dim_products.write.saveAsTable(f"{dest_database}.dim_products", mode="overwrite")

In [22]:
#testing- table preview

spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_products;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_products LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|         product_key|                 int|   NULL|
|          product_id|                 int|   NULL|
|                Name|              string|   NULL|
|               Color|              string|   NULL|
|       ProductNumber|              string|   NULL|
|     ProductCategory|              string|   NULL|
|    SafetyStockLevel|                 int|   NULL|
|        ReorderPoint|                 int|   NULL|
|        StandardCost|              double|   NULL|
|           ListPrice|              double|   NULL|
|   DaysToManufacture|                 int|   NULL|
|       SellStartDate|                date|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|  adventureworks_dlh|       |
|           

Unnamed: 0,product_key,product_id,Name,Color,ProductNumber,ProductCategory,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,DaysToManufacture,SellStartDate
0,1,1,Adjustable Race,,AR-5381,,1000,750,0.0,0.0,0,1998-06-01
1,2,2,Bearing Ball,,BA-8327,,1000,750,0.0,0.0,0,1998-06-01


## Part 2: Retrieving Data From MongoDB

In [23]:
client = get_mongo_client(**mongodb_args)

json_files = {"fact_purchase_orders" : "fact_purchase_orders.json",
              "fact_sales_orders" : 'fact_sales_orders.json',
             }

set_mongo_collections(client, mongodb_args["db_name"], batch_dir, json_files) 

Populating Fact Sales Orders

In [24]:
#retrieve data from mongoDB collection

mongodb_args["collection"] = "fact_sales_orders"

df_fact_sales_orders = get_mongodb_dataframe(spark, **mongodb_args)
df_fact_sales_orders.toPandas().head(2)

Unnamed: 0,AccountNumber,BillToAddressID,CarrierTrackingNumber,ContactID,Credit Card ExpMonth,Credit Card ExpYear,Credit Card Number,Credit Card Type,CreditCardApprovalCode,CustomerID,...,ShipBase,ShipDate,ShipMethod,ShipRate,ShipToAddressID,Status,SubTotal,TaxAmt,TotalDue,UnitPrice
0,10-4020-000676,985,4911-403C-98,378,2.0,2007.0,77777462752259,ColonialVoice,105041Vi84182,676,...,8.99,2001-07-08 00:00:00,CARGO TRANSPORT 5,1.49,985,5,24643.9362,1971.5149,27231.5495,20.1865
1,10-4020-000676,985,4911-403C-98,378,2.0,2007.0,77777462752259,ColonialVoice,105041Vi84182,676,...,8.99,2001-07-08 00:00:00,CARGO TRANSPORT 5,1.49,985,5,24643.9362,1971.5149,27231.5495,5.1865


In [25]:
#transformations

ordered_columns = ['AccountNumber', 'BillToAddressID', 'CarrierTrackingNumber', 'ContactID', 'Credit Card ExpMonth',
                   'Credit Card ExpYear', 'Credit Card Number', 'Credit Card Type', 'CreditCardApprovalCode', 'CustomerID',
                   'DueDate', 'Freight', 'OnlineOrderFlag', 'OrderDate', 'OrderQty', 'ProductID', 'PurchaseOrderNumber',
                   'RevisionNumber', 'Sales Territory', 'Sales Territory Group', 'SalesOrderID', 'SalesOrderNumber',
                   'SalesPersonID', 'ShipBase', 'ShipDate', 'ShipMethod', 'ShipRate', 'ShipToAddressID', 'Status',
                   'SubTotal', 'TaxAmt', 'TotalDue', 'UnitPrice']

df_fact_sales_orders = df_fact_sales_orders[ordered_columns]
df_fact_sales_orders.toPandas().head(2)

Unnamed: 0,AccountNumber,BillToAddressID,CarrierTrackingNumber,ContactID,Credit Card ExpMonth,Credit Card ExpYear,Credit Card Number,Credit Card Type,CreditCardApprovalCode,CustomerID,...,ShipBase,ShipDate,ShipMethod,ShipRate,ShipToAddressID,Status,SubTotal,TaxAmt,TotalDue,UnitPrice
0,10-4020-000676,985,4911-403C-98,378,2.0,2007.0,77777462752259,ColonialVoice,105041Vi84182,676,...,8.99,2001-07-08 00:00:00,CARGO TRANSPORT 5,1.49,985,5,24643.9362,1971.5149,27231.5495,20.1865
1,10-4020-000676,985,4911-403C-98,378,2.0,2007.0,77777462752259,ColonialVoice,105041Vi84182,676,...,8.99,2001-07-08 00:00:00,CARGO TRANSPORT 5,1.49,985,5,24643.9362,1971.5149,27231.5495,5.1865


In [26]:
#save table to data lakehouse

df_fact_sales_orders.write.saveAsTable(f"{dest_database}.fact_sales_orders", mode="overwrite")

In [27]:
#testing- table preview

spark.sql(f"DESCRIBE EXTENDED {dest_database}.fact_sales_orders;").show()
spark.sql(f"SELECT * FROM {dest_database}.fact_sales_orders LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|       AccountNumber|   string|   NULL|
|     BillToAddressID|      int|   NULL|
|CarrierTrackingNu...|   string|   NULL|
|           ContactID|      int|   NULL|
|Credit Card ExpMonth|      int|   NULL|
| Credit Card ExpYear|      int|   NULL|
|  Credit Card Number|   string|   NULL|
|    Credit Card Type|   string|   NULL|
|CreditCardApprova...|   string|   NULL|
|          CustomerID|      int|   NULL|
|             DueDate|   string|   NULL|
|             Freight|   double|   NULL|
|     OnlineOrderFlag|   string|   NULL|
|           OrderDate|   string|   NULL|
|            OrderQty|      int|   NULL|
|           ProductID|      int|   NULL|
| PurchaseOrderNumber|   string|   NULL|
|      RevisionNumber|      int|   NULL|
|     Sales Territory|   string|   NULL|
|Sales Territory G...|   string|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,AccountNumber,BillToAddressID,CarrierTrackingNumber,ContactID,Credit Card ExpMonth,Credit Card ExpYear,Credit Card Number,Credit Card Type,CreditCardApprovalCode,CustomerID,...,ShipBase,ShipDate,ShipMethod,ShipRate,ShipToAddressID,Status,SubTotal,TaxAmt,TotalDue,UnitPrice
0,10-4020-000676,985,4911-403C-98,378,2,2007,77777462752259,ColonialVoice,105041Vi84182,676,...,8.99,2001-07-08 00:00:00,CARGO TRANSPORT 5,1.49,985,5,24643.9362,1971.5149,27231.5495,20.1865
1,10-4020-000676,985,4911-403C-98,378,2,2007,77777462752259,ColonialVoice,105041Vi84182,676,...,8.99,2001-07-08 00:00:00,CARGO TRANSPORT 5,1.49,985,5,24643.9362,1971.5149,27231.5495,5.1865


## Part 3: Retrieving Data from mySQL

Populating Date Dimension

In [28]:
#retrieve data from dim_date in mySQL

sql_dim_date = "SELECT * FROM dim_date"
df_dim_date = get_mysql_dataframe(spark, sql_dim_date, **mysql_args)

In [29]:
#save table to data lakehouse
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

In [30]:
#testing- preview table

spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|            date_key|      int|   NULL|
|           full_date|     date|   NULL|
|           date_name| char(11)|   NULL|
|        date_name_us| char(11)|   NULL|
|        date_name_eu| char(11)|   NULL|
|         day_of_week|  tinyint|   NULL|
|    day_name_of_week| char(10)|   NULL|
|        day_of_month|  tinyint|   NULL|
|         day_of_year|      int|   NULL|
|     weekday_weekend| char(10)|   NULL|
|        week_of_year|  tinyint|   NULL|
|          month_name| char(10)|   NULL|
|       month_of_year|  tinyint|   NULL|
|is_last_day_of_month|  char(1)|   NULL|
|    calendar_quarter|  tinyint|   NULL|
|       calendar_year|      int|   NULL|
| calendar_year_month| char(10)|   NULL|
|   calendar_year_qtr| char(10)|   NULL|
|fiscal_month_of_year|  tinyint|   NULL|
|      fiscal_quarter|  tinyint|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


Verify Tables

In [31]:
spark.sql(f"USE {dest_database};")
spark.sql("SHOW TABLES").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,adventureworks_dlh,dim_customers,False
1,adventureworks_dlh,dim_date,False
2,adventureworks_dlh,dim_employees,False
3,adventureworks_dlh,dim_products,False
4,adventureworks_dlh,fact_sales_orders,False


### Data Integration

Verify Source Data Files Location

In [32]:
get_file_info(orders_stream_dir)

Unnamed: 0,name,size,modification_time
0,sales_orders_01.json,307788,2025-12-19 22:07:25.964736223
1,sales_orders_02.json,305195,2025-12-19 22:07:25.972462654
2,sales_orders_03.json,309566,2025-12-19 22:07:25.978381157


## Bronze Layer

In [33]:
#read in json file data into the stream

df_orders_bronze = (
    spark.readStream \
    .option("schemaLocation", orders_output_bronze) \
    .option("maxFilesPerTrigger", 1) \
    .option("multiLine", "true") \
    .json(orders_stream_dir)
)

df_orders_bronze.isStreaming

True

In [34]:
#write streaming data to parquet file

orders_checkpoint_bronze = os.path.join(orders_output_bronze, '_checkpoint')

orders_bronze_query = (
    df_orders_bronze
    .withColumn("receipt_time", current_timestamp())
    .withColumn("source_file", input_file_name())
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("orders_bronze")
    .trigger(availableNow = True) \
    .option("checkpointLocation", orders_checkpoint_bronze) \
    .option("compression", "snappy") \
    .start(orders_output_bronze)
)

In [35]:
#testing- query monitoring

print(f"Query ID: {orders_bronze_query.id}")
print(f"Query Name: {orders_bronze_query.name}")
print(f"Query Status: {orders_bronze_query.status}")

Query ID: 97977215-f813-45f4-acf2-4c7cecce6ed2
Query Name: orders_bronze
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [36]:
orders_bronze_query.awaitTermination()

## Silver Layer

In [37]:
#joining fact and dimension tables 

df_orders_silver = spark.readStream.format("parquet").load(orders_output_bronze) \
    .join(df_dim_customers, col('CustomerID') == df_dim_customers.customer_id, "left") \
    .join(df_dim_employees, col('SalesPersonID') == df_dim_employees.employee_id, "left") \
    .join(df_dim_products, col('ProductID') == df_dim_products.product_id, "left") \
    .join(df_dim_date, to_date(col("OrderDate")) == df_dim_date.full_date, "left") \
    .select(
        col("SalesOrderID").cast(LongType()),
        df_dim_customers.customer_key.cast(LongType()),
        df_dim_employees.employee_key.cast(LongType()),
        df_dim_products.product_key.cast(LongType()),
        df_dim_date.date_key.cast(LongType()).alias("order_date_key"),
        col("OrderQty"),
        col("UnitPrice"),
        col("TotalDue")
    )

df_orders_silver.isStreaming

True

In [38]:
#write streaming data to data lakehouse

orders_checkpoint_silver = os.path.join(orders_output_silver, '_checkpoint')

orders_silver_query = (
    df_orders_silver.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("orders_silver")
    .trigger(availableNow = True) \
    .option("checkpointLocation", orders_checkpoint_silver) \
    .option("compression", "snappy") \
    .start(orders_output_silver)
)

In [39]:
#testing- query monitoring

print(f"Query ID: {orders_silver_query.id}")
print(f"Query Name: {orders_silver_query.name}")
print(f"Query Status: {orders_silver_query.status}")

Query ID: fe9f543e-411a-4716-bf35-77fe6048abb2
Query Name: orders_silver
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [40]:
orders_silver_query.awaitTermination()

In [41]:
print(df_dim_products.columns)

['product_key', 'product_id', 'Name', 'Color', 'ProductNumber', 'ProductCategory', 'SafetyStockLevel', 'ReorderPoint', 'StandardCost', 'ListPrice', 'DaysToManufacture', 'SellStartDate']


## Gold Layer

In [42]:
#create business report query

df_orders_gold = spark.readStream.format("parquet").load(orders_output_silver) \
    .join(df_dim_products, "product_key") \
    .join(df_dim_date, df_dim_date.date_key == col("order_date_key")) \
    .groupBy("month_of_year", "ProductCategory", "month_name") \
    .agg(count("product_key").alias("product_count"),
         sum("TotalDue").alias("total_sales")) \
    

In [43]:
#write streaming data to memory

orders_gold_query = (
    df_orders_gold.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("fact_sales_orders_gold")
    .start()
)

spark.stop()

In [44]:
wait_until_stream_is_ready(orders_gold_query , 1)

The stream has processed 1 batchs


In [45]:
#query gold data from memory

df_fact_sales_orders_gold = spark.sql("SELECT * FROM fact_sales_orders_gold")
df_fact_sales_orders_gold.printSchema()

root
 |-- month_of_year: byte (nullable = true)
 |-- ProductCategory: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- product_count: long (nullable = false)
 |-- total_sales: double (nullable = true)



In [46]:
#create final report

df_fact_sales_orders_gold_final = df_fact_sales_orders_gold  \
        .select(
                col("month_name").alias("Month"), \
                col("ProductCategory").alias("Product Category"), \
                col("product_count").alias("Product Count"), \
                col("total_sales").alias("Total Sales"))

In [47]:
#load results into new table

df_fact_sales_orders_gold_final.write.saveAsTable(f"{dest_database}.fact_sales_orders_gold", mode="overwrite")
spark.sql(f"SELECT * FROM {dest_database}.fact_sales_orders_gold").toPandas()

Unnamed: 0,Month,Product Category,Product Count,Total Sales
0,July,Bikes,344,5645998.0
1,July,Accessories,37,966885.7
2,August,Bikes,304,12530980.0
3,August,Clothing,79,4703070.0
4,August,Components,120,6845573.0
5,August,Accessories,42,2375518.0
6,July,Components,61,2026928.0
7,July,Clothing,56,1405746.0


### Demonstration

In [48]:
sql_sales_by_category = """
SELECT `Product Category` AS product_category,
    SUM(`Total Sales`) AS total_sales
FROM adventureworks_dlh.fact_sales_orders_gold
GROUP BY `Product Category`
ORDER BY total_sales DESC
"""

spark.sql(sql_sales_by_category).toPandas()


Unnamed: 0,product_category,total_sales
0,Bikes,18176980.0
1,Components,8872501.0
2,Clothing,6108816.0
3,Accessories,3342404.0


In [49]:
spark.stop()