# DS-2002 Data Project 2 (Capstone): AdventureWorks Data Lakehouse

In [1]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W
from pymongo import MongoClient

/opt/miniconda3/envs/pysparkenv/lib/python3.12/site-packages/pyspark


### Global Variables

In [2]:
# Specify MySQL Server Connection Information
mysql_args = {
    "host_name" : "localhost",
    "port" : "3306",
    "db_name" : "adventureworks",
    "conn_props" : {
        "user" : "root",
        "password" : "tpwk",
        "driver" : "com.mysql.cj.jdbc.Driver"
    }
}

# Specify MongoDB Cluster Connection Information
mongodb_args = {
    "cluster_location" : "local",
    "user_name" : "your_username",
    "password" : "your_password",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "ep8zqil",
    "uri": "mongodb://localhost:27017",
    "db_name" : "adventureworks",
    "collection" : "",
    "null_column_threshold" : 0.5
}

# Specify Directory Structure for Source Data
base_dir = '/Users/lilyliau/Desktop/DS 2002/Final/data'
data_dir = os.path.join(base_dir, 'adventureworks')
batch_dir = os.path.join(data_dir, 'batch')
stream_dir = os.path.join(data_dir, 'streaming')

# Streaming data directories for fact tables
orders_stream_dir = orders_stream_dir = os.path.join(stream_dir, 'sales_orders')

# Create Directory Structure for Data Lakehouse Files
dest_database = "adventureworks_dlh"
sql_warehouse_dir = sql_warehouse_dir = os.path.abspath('/Users/lilyliau/Desktop/DS 2002/Final/spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

# Medallion Architecture Output Directories
orders_output_bronze = os.path.join(database_dir, 'fact_sales_orders', 'bronze')
orders_output_silver = os.path.join(database_dir, 'fact_sales_orders', 'silver')
orders_output_gold = os.path.join(database_dir, 'fact_sales_orders', 'gold')

In [3]:
sales_json_path = os.path.join(stream_dir, 'fact_sales_orders.json')
with open(sales_json_path, 'r') as f:
    sales_data = json.load(f)

total_records = len(sales_data)
chunk_size = total_records // 3

split_stream_dir = os.path.join(stream_dir, 'sales_orders')
os.makedirs(split_stream_dir, exist_ok=True)

for i in range(3):
    start_idx = i * chunk_size
    end_idx = total_records if i == 2 else (i + 1) * chunk_size
    
    chunk_data = sales_data[start_idx:end_idx]
    output_path = os.path.join(split_stream_dir, f'sales_orders_{i+1:02d}.json')
    
    with open(output_path, 'w') as f:
        json.dump(chunk_data, f)
    
    print(f"Created {output_path} with {len(chunk_data)} records")

Created /Users/lilyliau/Desktop/DS 2002/Final/data/adventureworks/streaming/sales_orders/sales_orders_01.json with 333 records
Created /Users/lilyliau/Desktop/DS 2002/Final/data/adventureworks/streaming/sales_orders/sales_orders_02.json with 333 records
Created /Users/lilyliau/Desktop/DS 2002/Final/data/adventureworks/streaming/sales_orders/sales_orders_03.json with 334 records


### Global Functions

In [4]:
def get_file_info(path: str):
    """Fetch file information from a directory."""
    file_sizes = []
    modification_times = []

    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    """Wait for streaming query to process minimum batches."""
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
    print(f"The stream has processed {len(query.recentProgress)} batches")


def remove_directory_tree(path: str):
    """Remove directory tree for idempotency."""
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
    except Exception as e:
        return f"An error occurred: {e}"


def drop_null_columns(df, threshold):
    """Drop columns with NULL values exceeding threshold."""
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    return df_dropped


def get_mysql_dataframe(spark_session, sql_query: str, **args):
    """Query MySQL database and return a Spark DataFrame."""
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}?useSSL=false&allowPublicKeyRetrieval=true"
    
    dframe = spark_session.read.format("jdbc") \
        .option("url", jdbc_url) \
        .option("driver", args['conn_props']['driver']) \
        .option("user", args['conn_props']['user']) \
        .option("password", args['conn_props']['password']) \
        .option("query", sql_query) \
        .load()
    
    return dframe


def get_mongo_uri(**args):
    """Build MongoDB connection URI."""
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("Specify 'atlas' or 'local' for cluster_location.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_mongo_client(**args):
    """Get MongoDB client connection."""
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())
    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
    else:
        raise Exception("MongoDB Client could not be created.")
    return client


def set_mongo_collections(mongo_client, db_name: str, data_directory: str, json_files: dict):
    """Load JSON files into MongoDB collections."""
    db = mongo_client[db_name]
    
    for collection_name, filename in json_files.items():
        db.drop_collection(collection_name)
        json_file = os.path.join(data_directory, filename)
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            collection = db[collection_name]
            result = collection.insert_many(json_object)
        print(f"Loaded {len(json_object)} documents into '{collection_name}' collection.")
        
    mongo_client.close()


def get_mongodb_dataframe(spark_session, **args):
    """
    Fetch data from MongoDB and return as Spark DataFrame.
    Uses PyMongo to load data, then converts via Pandas to Spark.
    This workaround addresses Spark-MongoDB connector compatibility issues.
    """
    # Connect via PyMongo
    client = MongoClient(args['uri'])
    db = client[args['db_name']]
    collection = db[args['collection']]
    
    # Fetch all documents (exclude _id)
    docs = list(collection.find({}, {'_id': 0}))
    client.close()
    
    if not docs:
        raise ValueError(f"No documents found in collection: {args['collection']}")
    
    # Convert to Spark DataFrame via Pandas
    pdf = pd.DataFrame(docs)
    dframe = spark_session.createDataFrame(pdf)
    
    # Drop columns with excessive nulls
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

### Data Lakehouse Directory Structure

In [5]:
remove_directory_tree(database_dir)

"Directory '/Users/lilyliau/Desktop/DS 2002/Final/spark-warehouse/adventureworks_dlh.db' has been removed successfully."

### New Spark Session

In [6]:
mysql_spark_jar = "/Users/lilyliau/Desktop/DS 2002/04-PySpark/mysql-connector-j-9.1.0/mysql-connector-j-9.1.0.jar"
spark = SparkSession.builder \
    .appName("AdventureWorks Data Lakehouse (Medallion Architecture)") \
    .master(f"local[{int(os.cpu_count()/2)}]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.jars", mysql_spark_jar) \
    .config("spark.sql.warehouse.dir", sql_warehouse_dir) \
    .config("spark.sql.adaptive.enabled", "false") \
    .config("spark.sql.shuffle.partitions", int(os.cpu_count())) \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
spark

25/12/15 22:12:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### New Metadata Database

In [7]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'AdventureWorks Data Lakehouse - DS-2002 Capstone Project'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Final Project');
"""
spark.sql(sql_create_db)
spark.sql(f"USE {dest_database}")

DataFrame[]


### Populate Dimensions by Ingesting "Cold-path" Reference Data

In [8]:
get_file_info(batch_dir)

Unnamed: 0,name,size,modification_time
0,.DS_Store,6148,2025-11-30 19:35:15.699223280
1,adventureworks_products.json,306034,2025-11-27 17:22:59.386832952
2,adventureworks_vendors.json,41885,2025-11-27 17:23:11.923132896
3,dim_customers.csv,134341,2025-11-26 17:37:01.642629623
4,dim_date.csv,138125,2025-11-26 17:39:27.810449123
5,dim_employees.csv,48470,2025-11-26 17:37:17.227450609


In [9]:
employees_csv = os.path.join(batch_dir, 'dim_employees.csv')
print(f"Source: {employees_csv}")

df_dim_employees = spark.read.format('csv').options(header='true', inferSchema='true').load(employees_csv)
df_dim_employees.toPandas().head(2)

Source: /Users/lilyliau/Desktop/DS 2002/Final/data/adventureworks/batch/dim_employees.csv


Unnamed: 0,employee_key,EmployeeID,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,1,14417807,adventure-works\guy1,16,Guy,R,Gilbert,Production Technician - WC60,guy1@adventure-works.com,0,320-555-0195,1972-05-15,M,M,1996-07-31,0,21,30,1
1,2,2,253022876,adventure-works\kevin0,6,Kevin,F,Brown,Marketing Assistant,kevin0@adventure-works.com,2,150-555-0189,1977-06-03,S,M,1997-02-26,0,42,41,1


In [10]:
ordered_columns = ['employee_key', 'EmployeeID', 'FirstName', 'MiddleName', 'LastName',
                   'Title', 'EmailAddress', 'Phone', 'BirthDate', 'HireDate',
                   'MaritalStatus', 'Gender', 'SalariedFlag', 'VacationHours', 
                   'SickLeaveHours', 'CurrentFlag']

df_dim_employees = df_dim_employees.select([c for c in ordered_columns if c in df_dim_employees.columns])
df_dim_employees.toPandas().head(2)

Unnamed: 0,employee_key,EmployeeID,FirstName,MiddleName,LastName,Title,EmailAddress,Phone,BirthDate,HireDate,MaritalStatus,Gender,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,1,Guy,R,Gilbert,Production Technician - WC60,guy1@adventure-works.com,320-555-0195,1972-05-15,1996-07-31,M,M,0,21,30,1
1,2,2,Kevin,F,Brown,Marketing Assistant,kevin0@adventure-works.com,150-555-0189,1977-06-03,1997-02-26,S,M,0,42,41,1


In [11]:
df_dim_employees.write.saveAsTable(f"{dest_database}.dim_employees", mode="overwrite")

                                                                                

In [12]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_employees;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_employees LIMIT 2").toPandas()

+--------------------+------------------+-------+
|            col_name|         data_type|comment|
+--------------------+------------------+-------+
|        employee_key|               int|   NULL|
|          EmployeeID|               int|   NULL|
|           FirstName|            string|   NULL|
|          MiddleName|            string|   NULL|
|            LastName|            string|   NULL|
|               Title|            string|   NULL|
|        EmailAddress|            string|   NULL|
|               Phone|            string|   NULL|
|           BirthDate|              date|   NULL|
|            HireDate|              date|   NULL|
|       MaritalStatus|            string|   NULL|
|              Gender|            string|   NULL|
|        SalariedFlag|               int|   NULL|
|       VacationHours|               int|   NULL|
|      SickLeaveHours|               int|   NULL|
|         CurrentFlag|               int|   NULL|
|                    |                  |       |


Unnamed: 0,employee_key,EmployeeID,FirstName,MiddleName,LastName,Title,EmailAddress,Phone,BirthDate,HireDate,MaritalStatus,Gender,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,1,Guy,R,Gilbert,Production Technician - WC60,guy1@adventure-works.com,320-555-0195,1972-05-15,1996-07-31,M,M,0,21,30,1
1,2,2,Kevin,F,Brown,Marketing Assistant,kevin0@adventure-works.com,150-555-0189,1977-06-03,1997-02-26,S,M,0,42,41,1


In [13]:
customers_csv = os.path.join(batch_dir, 'dim_customers.csv')
print(f"Source: {customers_csv}")

df_dim_customers = spark.read.format('csv').options(header='true', inferSchema='true').load(customers_csv)
df_dim_customers.toPandas().head(2)

Source: /Users/lilyliau/Desktop/DS 2002/Final/data/adventureworks/batch/dim_customers.csv


Unnamed: 0,customer_key,CustomerID,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,0,98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,0,98055,US,United States,North America,Northwest


In [14]:
ordered_columns = ['customer_key', 'CustomerID', 'AccountNumber', 'CustomerType',
                   'AddressLine1', 'AddressLine2', 'City', 'StateProvinceCode',
                   'State_Province', 'PostalCode', 'CountryRegionCode', 'Country_Region',
                   'Sales Territory Group', 'Sales Territory']

df_dim_customers = df_dim_customers.select([c for c in ordered_columns if c in df_dim_customers.columns])
df_dim_customers.toPandas().head(2)

Unnamed: 0,customer_key,CustomerID,AccountNumber,CustomerType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,2251 Elliot Avenue,,Seattle,WA,Washington,98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,7943 Walnut Ave,,Renton,WA,Washington,98055,US,United States,North America,Northwest


In [15]:
df_dim_customers.write.saveAsTable(f"{dest_database}.dim_customers", mode="overwrite")

In [16]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_customers;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_customers LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|        customer_key|                 int|   NULL|
|          CustomerID|                 int|   NULL|
|       AccountNumber|              string|   NULL|
|        CustomerType|              string|   NULL|
|        AddressLine1|              string|   NULL|
|        AddressLine2|              string|   NULL|
|                City|              string|   NULL|
|   StateProvinceCode|              string|   NULL|
|      State_Province|              string|   NULL|
|          PostalCode|              string|   NULL|
|   CountryRegionCode|              string|   NULL|
|      Country_Region|              string|   NULL|
|Sales Territory G...|              string|   NULL|
|     Sales Territory|              string|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|           

Unnamed: 0,customer_key,CustomerID,AccountNumber,CustomerType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,2251 Elliot Avenue,,Seattle,WA,Washington,98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,7943 Walnut Ave,,Renton,WA,Washington,98055,US,United States,North America,Northwest




### Reference Data from MongoDB

In [17]:
client = get_mongo_client(**mongodb_args)

json_files = {
    "products": "adventureworks_products.json"
}

set_mongo_collections(client, mongodb_args["db_name"], batch_dir, json_files)

Loaded 504 documents into 'products' collection.


In [18]:
mongodb_args["collection"] = "products"
df_dim_products = get_mongodb_dataframe(spark, **mongodb_args)
df_dim_products.toPandas().head(2)

                                                                                

Unnamed: 0,product_key,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Weight,DaysToManufacture,ProductLine,ProductCategory,ProductSubcategory,ProductModel,SellStartDate
0,1,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,0.0,,0,,,,,1998-06-01
1,2,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,0.0,,0,,,,,1998-06-01


In [19]:
ordered_columns = ['product_key', 'ProductID', 'Name', 'ProductCategory', 'ProductSubcategory',
                   'ProductNumber', 'ProductModel', 'ProductLine', 'Class', 'Style', 'Color',
                   'Size', 'SizeUnitMeasureCode', 'Weight', 'WeightUnitMeasureCode',
                   'ListPrice', 'StandardCost', 'DaysToManufacture', 'SafetyStockLevel',
                   'ReorderPoint', 'MakeFlag', 'FinishedGoodsFlag', 'SellStartDate']

available_cols = [c for c in ordered_columns if c in df_dim_products.columns]
df_dim_products = df_dim_products.select(available_cols)
df_dim_products.toPandas().head(2)

Unnamed: 0,product_key,ProductID,Name,ProductCategory,ProductSubcategory,ProductNumber,ProductModel,ProductLine,Color,Weight,ListPrice,StandardCost,DaysToManufacture,SafetyStockLevel,ReorderPoint,MakeFlag,FinishedGoodsFlag,SellStartDate
0,1,1,Adjustable Race,,,AR-5381,,,,,0.0,0.0,0,1000,750,0,0,1998-06-01
1,2,2,Bearing Ball,,,BA-8327,,,,,0.0,0.0,0,1000,750,0,0,1998-06-01


In [20]:
df_dim_products.write.saveAsTable(f"{dest_database}.dim_products", mode="overwrite")

In [21]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_products;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_products LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|         product_key|   bigint|   NULL|
|           ProductID|   bigint|   NULL|
|                Name|   string|   NULL|
|     ProductCategory|   string|   NULL|
|  ProductSubcategory|   string|   NULL|
|       ProductNumber|   string|   NULL|
|        ProductModel|   string|   NULL|
|         ProductLine|   string|   NULL|
|               Color|   string|   NULL|
|              Weight|   double|   NULL|
|           ListPrice|   double|   NULL|
|        StandardCost|   double|   NULL|
|   DaysToManufacture|   bigint|   NULL|
|    SafetyStockLevel|   bigint|   NULL|
|        ReorderPoint|   bigint|   NULL|
|            MakeFlag|   string|   NULL|
|   FinishedGoodsFlag|   string|   NULL|
|       SellStartDate|   string|   NULL|
|                    |         |       |
|# Detailed Table ...|         |       |
+--------------------+---------+-------+
only showing top

Unnamed: 0,product_key,ProductID,Name,ProductCategory,ProductSubcategory,ProductNumber,ProductModel,ProductLine,Color,Weight,ListPrice,StandardCost,DaysToManufacture,SafetyStockLevel,ReorderPoint,MakeFlag,FinishedGoodsFlag,SellStartDate
0,379,874,"Racing Socks, M",Clothing,Socks,SO-R809-M,Racing Socks,R,White,,8.99,3.3623,0,4,3,0,1,2003-07-01
1,380,875,"Racing Socks, L",Clothing,Socks,SO-R809-L,Racing Socks,R,White,,8.99,3.3623,0,4,3,0,1,2003-07-01



### Reference Data from MySQL

In [22]:
sql_query = "SELECT * FROM dim_date"
df_dim_date = get_mysql_dataframe(spark, sql_query, **mysql_args)
print(f"Loaded {df_dim_date.count()} rows from MySQL dim_date table")
df_dim_date.toPandas().head(2)

Loaded 4018 rows from MySQL dim_date table


Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [23]:
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

In [24]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|            date_key|      int|   NULL|
|           full_date|     date|   NULL|
|           date_name| char(11)|   NULL|
|        date_name_us| char(11)|   NULL|
|        date_name_eu| char(11)|   NULL|
|         day_of_week|      int|   NULL|
|    day_name_of_week| char(10)|   NULL|
|        day_of_month|      int|   NULL|
|         day_of_year|      int|   NULL|
|     weekday_weekend| char(10)|   NULL|
|        week_of_year|      int|   NULL|
|          month_name| char(10)|   NULL|
|       month_of_year|      int|   NULL|
|is_last_day_of_month|  char(1)|   NULL|
|    calendar_quarter|      int|   NULL|
|       calendar_year|      int|   NULL|
| calendar_year_month| char(10)|   NULL|
|   calendar_year_qtr| char(10)|   NULL|
|fiscal_month_of_year|      int|   NULL|
|      fiscal_quarter|      int|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


### Verify Dimension Tables

In [25]:
spark.sql(f"SHOW TABLES IN {dest_database}").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,adventureworks_dlh,dim_customers,False
1,adventureworks_dlh,dim_date,False
2,adventureworks_dlh,dim_employees,False
3,adventureworks_dlh,dim_products,False



### Integrate Reference Data with Real-Time (Streaming) Data

In [26]:
get_file_info(orders_stream_dir)

Unnamed: 0,name,size,modification_time
0,sales_orders_01.json,313008,2025-12-16 03:12:40.124758720
1,sales_orders_02.json,310361,2025-12-16 03:12:40.136465073
2,sales_orders_03.json,315194,2025-12-16 03:12:40.147641182


## Bronze Layer

In [27]:
df_orders_bronze = (
    spark.readStream
    .option("schemaLocation", orders_output_bronze)
    .option("maxFilesPerTrigger", 1)
    .option("multiLine", "true")
    .json(orders_stream_dir)
    .filter(col("fact_sales_orders_key").isNotNull())
)

In [28]:
orders_checkpoint_bronze = os.path.join(orders_output_bronze, '_checkpoint')

orders_bronze_query = (
    df_orders_bronze
    .withColumn("receipt_time", current_timestamp())
    .withColumn("source_file", input_file_name())
    .writeStream
    .format("parquet")
    .outputMode("append")
    .queryName("orders_bronze")
    .trigger(availableNow=True)
    .option("checkpointLocation", orders_checkpoint_bronze)
    .option("compression", "snappy")
    .start(orders_output_bronze)
)

In [29]:
print(f"Query ID: {orders_bronze_query.id}")
print(f"Query Name: {orders_bronze_query.name}")
print(f"Query Status: {orders_bronze_query.status}")

Query ID: 51c9a068-4c58-4846-a393-f5cd0c354672
Query Name: orders_bronze
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [30]:
orders_bronze_query.awaitTermination()

25/12/15 22:13:02 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [31]:
df_orders_bronze = spark.read.format("parquet").load(orders_output_bronze)
df_orders_bronze.printSchema()
df_orders_bronze.show(5)

root
 |-- AccountNumber: string (nullable = true)
 |-- BillToAddressID: long (nullable = true)
 |-- CarrierTrackingNumber: string (nullable = true)
 |-- ContactID: long (nullable = true)
 |-- Credit Card ExpMonth: long (nullable = true)
 |-- Credit Card ExpYear: long (nullable = true)
 |-- Credit Card Number: string (nullable = true)
 |-- Credit Card Type: string (nullable = true)
 |-- CreditCardApprovalCode: string (nullable = true)
 |-- Freight: double (nullable = true)
 |-- LineTotal: double (nullable = true)
 |-- OnlineOrderFlag: string (nullable = true)
 |-- OrderQty: long (nullable = true)
 |-- PurchaseOrderNumber: string (nullable = true)
 |-- RevisionNumber: long (nullable = true)
 |-- Sales Territory: string (nullable = true)
 |-- Sales Territory Group: string (nullable = true)
 |-- SalesOrderID: long (nullable = true)
 |-- SalesOrderNumber: string (nullable = true)
 |-- SalesPersonID: long (nullable = true)
 |-- ShipBase: double (nullable = true)
 |-- ShipMethod: string (null

## Silver Layer

In [32]:
df_dim_customers_renamed = df_dim_customers \
    .withColumnRenamed("AccountNumber", "cust_AccountNumber") \
    .withColumnRenamed("Sales Territory", "cust_Sales_Territory") \
    .withColumnRenamed("Sales Territory Group", "cust_Sales_Territory_Group")

df_dim_employees_renamed = df_dim_employees \
    .withColumnRenamed("EmployeeID", "emp_EmployeeID")

df_orders_silver = spark.readStream.format("parquet").load(orders_output_bronze) \
    .join(df_dim_customers_renamed, "customer_key", "left") \
    .join(df_dim_products, "product_key", "left") \
    .join(df_dim_employees_renamed, col("SalesPersonID") == col("emp_EmployeeID"), "left") \
    .join(df_dim_date, col("sales_order_date_key") == col("date_key"), "left")

In [33]:
orders_checkpoint_silver = os.path.join(orders_output_silver, '_checkpoint')

orders_silver_query = (
    df_orders_silver.writeStream
    .format("parquet")
    .outputMode("append")
    .queryName("orders_silver")
    .trigger(availableNow=True)
    .option("checkpointLocation", orders_checkpoint_silver)
    .option("compression", "snappy")
    .start(orders_output_silver)
)

In [34]:
print(f"Query ID: {orders_silver_query.id}")
print(f"Query Name: {orders_silver_query.name}")
print(f"Query Status: {orders_silver_query.status}")

Query ID: adef3f75-805a-4545-803e-c897122c5423
Query Name: orders_silver
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [35]:
orders_silver_query.awaitTermination()
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {dest_database}.fact_sales_orders
    USING parquet
    LOCATION '{orders_output_silver}'
""")

DataFrame[]

## Gold Layer

In [36]:
df_orders_gold = spark.readStream.format("parquet").load(orders_output_silver) \
    .groupBy("month_of_year", "month_name", "ProductCategory") \
    .agg(
        count("*").alias("order_count"),
        sum("OrderQty").alias("total_quantity"),
        sum("LineTotal").alias("total_sales")
    ) \
    .orderBy(asc("month_of_year"), desc("total_sales"))

In [37]:
orders_checkpoint_gold = os.path.join(orders_output_gold, '_checkpoint')

orders_gold_query = (
    df_orders_gold.writeStream
    .format("memory")
    .outputMode("complete")
    .queryName("fact_sales_by_category")
    .option("checkpointLocation", orders_checkpoint_gold)
    .start()
)

In [38]:
wait_until_stream_is_ready(orders_gold_query, 1)

                                                                                

The stream has processed 1 batches


In [39]:
df_gold_results = spark.sql("SELECT * FROM fact_sales_by_category")
df_gold_results.toPandas()

Unnamed: 0,month_of_year,month_name,ProductCategory,order_count,total_quantity,total_sales
0,7,July,Bikes,344,601,926619.9617
1,7,July,Components,61,114,31525.9604
2,7,July,Clothing,56,167,2875.1536
3,7,July,Accessories,37,84,1695.666
4,8,August,Bikes,274,737,995142.475048
5,8,August,Components,115,221,93385.9384
6,8,August,Clothing,72,264,4433.413614
7,8,August,Accessories,41,115,2321.4475


In [40]:
df_gold_results.write.saveAsTable(f"{dest_database}.fact_sales_by_category", mode="overwrite")
spark.sql(f"SELECT * FROM {dest_database}.fact_sales_by_category").toPandas()

Unnamed: 0,month_of_year,month_name,ProductCategory,order_count,total_quantity,total_sales
0,7,July,Clothing,56,167,2875.1536
1,7,July,Accessories,37,84,1695.666
2,8,August,Clothing,72,264,4433.413614
3,8,August,Accessories,41,115,2321.4475
4,7,July,Bikes,344,601,926619.9617
5,7,July,Components,61,114,31525.9604
6,8,August,Bikes,274,737,995142.475048
7,8,August,Components,115,221,93385.9384


In [41]:
orders_gold_query.stop()

## Analytical Queries

In [42]:
## top-selling products
spark.sql(f"""
    SELECT 
        p.Name AS product_name,
        p.ProductCategory AS category,
        SUM(f.OrderQty) AS total_quantity,
        SUM(f.LineTotal) AS total_revenue
    FROM {dest_database}.fact_sales_orders f
    JOIN {dest_database}.dim_products p 
        ON f.product_key = p.product_key
    GROUP BY p.Name, p.ProductCategory
    ORDER BY total_revenue DESC
    LIMIT 10
""").toPandas()


Unnamed: 0,product_name,category,total_quantity,total_revenue
0,"Mountain-100 Black, 44",Bikes,77,164024.514
1,"Mountain-100 Silver, 44",Bikes,74,160620.967584
2,"Mountain-100 Silver, 38",Bikes,69,146199.57
3,"Mountain-100 Black, 38",Bikes,69,141001.682216
4,"Mountain-100 Black, 42",Bikes,67,137024.594
5,"Road-150 Red, 56",Bikes,52,133111.644
6,"Mountain-100 Black, 48",Bikes,63,131624.61
7,"Mountain-100 Silver, 42",Bikes,55,111911.350848
8,"Road-150 Red, 48",Bikes,28,100191.56
9,"Mountain-100 Silver, 48",Bikes,47,99959.706


## Final Tables Summary

In [43]:
spark.sql(f"SHOW TABLES IN {dest_database}").show()


+------------------+--------------------+-----------+
|         namespace|           tableName|isTemporary|
+------------------+--------------------+-----------+
|adventureworks_dlh|       dim_customers|      false|
|adventureworks_dlh|            dim_date|      false|
|adventureworks_dlh|       dim_employees|      false|
|adventureworks_dlh|        dim_products|      false|
|adventureworks_dlh|fact_sales_by_cat...|      false|
|adventureworks_dlh|   fact_sales_orders|      false|
|                  |fact_sales_by_cat...|       true|
+------------------+--------------------+-----------+



In [44]:
spark.stop()