# DS-2002 Data Project 2: Northwind Data Lakehouse
**Student**: Sebastian Baldeon

## Project Overview
This project demonstrates building a dimensional Data Lakehouse using the Medallion Architecture (Bronze → Silver → Gold) with PySpark Structured Streaming. The solution integrates data from multiple sources:


## Section I: Setup and Configuration
### 1.0. Import Required Libraries

In [None]:
import findspark
findspark.init()
print(findspark.find())
import config.py as config

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

c:\Users\sebas\ds2002\.venv\Lib\site-packages\pyspark


### 2.0. Configure Connection Parameters

In [3]:
# --------------------------------------------------------------------------------
# MySQL Configuration (Local OLTP Source)
# --------------------------------------------------------------------------------
mysql_args = {
    "host_name" : "127.0.0.1",
    "port" : "3306",
    "db_name" : "northwind",
    "conn_props" : {
        "user" : MYSQL_USER,
        "password" : MYSQL_PASSWORD,
        "driver" : "com.mysql.cj.jdbc.Driver"
    }
}

# --------------------------------------------------------------------------------
# MongoDB Atlas Configuration (NoSQL Cloud Source)
# --------------------------------------------------------------------------------
mongodb_args = {
    "cluster_location" : "atlas",
    "user_name" : MONGODB_USER,
    "password" : MONGODB_PASSWORD,
    "cluster_name" : "ds2002-lab6",
    "cluster_subnet" : "fsfchfq",
    "db_name" : "northwind_project2",
    "collection" : "",
    "null_column_threshold" : 0.5
}

# --------------------------------------------------------------------------------
# Directory Structure for Streaming Data and Output
# --------------------------------------------------------------------------------
base_dir = os.getcwd()
streaming_data_dir = os.path.join(base_dir, 'streaming_data')

# Data Lakehouse Output Directories
dest_database = "northwind_project2_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

# Bronze, Silver, Gold paths for fact_orders
orders_output_bronze = os.path.join(database_dir, 'fact_orders', 'bronze')
orders_output_silver = os.path.join(database_dir, 'fact_orders', 'silver')
orders_output_gold = os.path.join(database_dir, 'fact_orders', 'gold')

print("Configuration complete!")
print(f"Streaming data directory: {streaming_data_dir}")
print(f"Output database directory: {database_dir}")

NameError: name 'MYSQL_USER' is not defined

### 3.0. Define Helper Functions

In [None]:
def get_mongo_uri(**args):
    """Generate MongoDB Atlas connection URI"""
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"
    return uri


def get_spark_conf(**args):
    """Create Spark configuration with all necessary settings"""
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_mysql_dataframe(spark_session, sql_query: str, **args):
    """Read data from MySQL using JDBC"""
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe


def get_mongodb_dataframe(spark_session, **args):
    """Read data from MongoDB Atlas"""
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    # Drop the '_id' index column
    dframe = dframe.drop('_id')
    
    return dframe


def remove_directory_tree(path: str):
    """Remove directory tree for clean restart"""
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' removed successfully."
        else:
            return f"Directory '{path}' does not exist."
    except Exception as e:
        return f"Error: {e}"


def wait_until_stream_is_ready(query, min_batches=1):
    """Wait for streaming query to process batches"""
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
    print(f"Stream has processed {len(query.recentProgress)} batches")

print("Helper functions defined!")

Helper functions defined!


### 4.0. Clean Up Previous Runs

In [None]:
result = remove_directory_tree(database_dir)
print(result)

Directory 'c:\Users\sebas\ds2002\final_project\spark-warehouse\northwind_project2_dlh.db' removed successfully.


### 5.0. Create Spark Session

In [None]:
# Spark configuration arguments
worker_threads = f"local[{int(os.cpu_count()/2)}]"
mongo_uri = get_mongo_uri(**mongodb_args)

# MySQL JAR path - UPDATED TO CORRECT LOCATION
mysql_jar_path = r"C:\Users\sebas\DS-2002\04-PySpark\mysql-connector-j-9.1.0\mysql-connector-j-9.1.0.jar"

sparkConf_args = {
    "app_name" : "DS-2002 Project 2 - Northwind Data Lakehouse",
    "worker_threads" : worker_threads,
    "shuffle_partitions" : int(os.cpu_count()),
    "mongo_uri" : mongo_uri,
    "spark_jars" : mysql_jar_path,
    "database_dir" : sql_warehouse_dir
}

# Create Spark configuration
sparkConf = get_spark_conf(**sparkConf_args)

# Create Spark session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("WARN")

print(" Spark session created successfully!")
spark

✓ Spark session created successfully!


### 6.0. Create Data Lakehouse Database

In [None]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE")

sql_create_db = f"""
CREATE DATABASE IF NOT EXISTS {dest_database}
COMMENT 'DS-2002 Project 2 - Northwind Data Lakehouse'
WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Final Project')
"""

spark.sql(sql_create_db)
print(f" Database '{dest_database}' created!")

✓ Database 'northwind_project2_dlh' created!


## Section II: Load Dimension Tables
### 1.0. Load Dimensions from MongoDB Atlas (NoSQL Source)

In [None]:
# Load dim_customers from MongoDB
mongodb_args['collection'] = 'dim_customers'
dim_customers = get_mongodb_dataframe(spark, **mongodb_args)

print(f" Loaded {dim_customers.count()} customers from MongoDB Atlas")
dim_customers.show(5)

✓ Loaded 29 customers from MongoDB Atlas
+-------+------------+-------------+
|country|customer_key|customer_name|
+-------+------------+-------------+
|    USA|          12|    Company L|
|    USA|          16|    Company P|
|    USA|           2|    Company B|
|    USA|          21|    Company U|
|    USA|          24|    Company X|
+-------+------------+-------------+
only showing top 5 rows



In [None]:
# Load dim_products from MongoDB
mongodb_args['collection'] = 'dim_products'
dim_products = get_mongodb_dataframe(spark, **mongodb_args)

print(f" Loaded {dim_products.count()} products from MongoDB Atlas")
dim_products.show(5)

✓ Loaded 45 products from MongoDB Atlas
+-----------+--------------------+----------+
|product_key|        product_name|unit_price|
+-----------+--------------------+----------+
|         14|Northwind Traders...|     23.25|
|         19|Northwind Traders...|       9.2|
|         56|Northwind Traders...|      38.0|
|         83|Northwind Traders...|       1.8|
|         90|Northwind Traders...|       1.8|
+-----------+--------------------+----------+
only showing top 5 rows



### 2.0. Load Dimensions from MySQL (Relational Source)

In [None]:
# Load dim_employees from local MySQL
sql_employees = "SELECT * FROM employees"
dim_employees = get_mysql_dataframe(spark, sql_employees, **mysql_args)

print(f" Loaded {dim_employees.count()} employees from MySQL")
dim_employees.show(5)

✓ Loaded 9 employees from MySQL
+---+-----------------+---------+----------+--------------------+--------------------+--------------+-------------+------------+-------------+--------------+--------+--------------+---------------+--------------+--------------------+--------------------+-----------+
| id|          company|last_name|first_name|       email_address|           job_title|business_phone|   home_phone|mobile_phone|   fax_number|       address|    city|state_province|zip_postal_code|country_region|            web_page|               notes|attachments|
+---+-----------------+---------+----------+--------------------+--------------------+--------------+-------------+------------+-------------+--------------+--------+--------------+---------------+--------------+--------------------+--------------------+-----------+
|  1|Northwind Traders|Freehafer|     Nancy|nancy@northwindtr...|Sales Representative| (123)555-0100|(123)555-0102|        NULL|(123)555-0103|123 1st Avenue| Seattle| 

In [None]:
# Load dim_shippers from local MySQL
sql_shippers = "SELECT * FROM shippers"
dim_shippers = get_mysql_dataframe(spark, sql_shippers, **mysql_args)

print(f" Loaded {dim_shippers.count()} shippers from MySQL")
dim_shippers.show()

✓ Loaded 3 shippers from MySQL
+---+------------------+---------+----------+-------------+---------+--------------+----------+------------+----------+--------------+-------+--------------+---------------+--------------+--------+-----+-----------+
| id|           company|last_name|first_name|email_address|job_title|business_phone|home_phone|mobile_phone|fax_number|       address|   city|state_province|zip_postal_code|country_region|web_page|notes|attachments|
+---+------------------+---------+----------+-------------+---------+--------------+----------+------------+----------+--------------+-------+--------------+---------------+--------------+--------+-----+-----------+
|  1|Shipping Company A|     NULL|      NULL|         NULL|     NULL|          NULL|      NULL|        NULL|      NULL|123 Any Street|Memphis|            TN|          99999|           USA|    NULL| NULL|         []|
|  2|Shipping Company B|     NULL|      NULL|         NULL|     NULL|          NULL|      NULL|        NU

### 3.0. Create Date Dimension

In [None]:
# Create date dimension from 2006-01-01 to 2006-12-31
date_range_df = spark.sql("""
    SELECT 
        sequence(to_date('2006-01-01'), to_date('2006-12-31'), interval 1 day) as date
""")

dim_date = date_range_df.select(explode(col("date")).alias("full_date")) \
    .select(
        col("full_date"),
        year("full_date").alias("year"),
        month("full_date").alias("month"),
        dayofmonth("full_date").alias("day"),
        dayofweek("full_date").alias("day_of_week"),
        dayofyear("full_date").alias("day_of_year"),
        weekofyear("full_date").alias("week_of_year"),
        quarter("full_date").alias("quarter"),
        date_format("full_date", "EEEE").alias("day_name"),
        date_format("full_date", "MMMM").alias("month_name")
    )

print(f" Generated date dimension with {dim_date.count()} dates")
dim_date.show(5)

✓ Generated date dimension with 365 dates
+----------+----+-----+---+-----------+-----------+------------+-------+---------+----------+
| full_date|year|month|day|day_of_week|day_of_year|week_of_year|quarter| day_name|month_name|
+----------+----+-----+---+-----------+-----------+------------+-------+---------+----------+
|2006-01-01|2006|    1|  1|          1|          1|          52|      1|   Sunday|   January|
|2006-01-02|2006|    1|  2|          2|          2|           1|      1|   Monday|   January|
|2006-01-03|2006|    1|  3|          3|          3|           1|      1|  Tuesday|   January|
|2006-01-04|2006|    1|  4|          4|          4|           1|      1|Wednesday|   January|
|2006-01-05|2006|    1|  5|          5|          5|           1|      1| Thursday|   January|
+----------+----+-----+---+-----------+-----------+------------+-------+---------+----------+
only showing top 5 rows



## Section III: Streaming Data Integration (Bronze → Silver → Gold)
### Bronze Layer: Ingest Streaming Order Details from JSON Files

#### 1.0. Define Schema for Streaming Data

In [None]:
# Define schema for incoming order details stream
order_details_schema = StructType([
    StructField("order_detail_id", IntegerType(), True),
    StructField("order_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", DoubleType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("discount", DoubleType(), True),
    StructField("status_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("employee_id", IntegerType(), True),
    StructField("shipper_id", IntegerType(), True)
])

print(" Schema defined for streaming order details")

✓ Schema defined for streaming order details


#### 2.0. Convert JSON Files to Newline-Delimited Format

In [None]:
# ============================================================================
# CREATE STREAMING DATA FILES FROM MySQL
# ============================================================================
import json
import mysql.connector
from decimal import Decimal

print("Creating streaming data files from MySQL...")

# Helper function to convert Decimal to float
def decimal_to_float(obj):
    if isinstance(obj, Decimal):
        return float(obj)
    raise TypeError

# Connect to MySQL
conn = mysql.connector.connect(
    host="127.0.0.1",
    user="sebastian",
    password="SebasBal5*",
    database="northwind"
)

cursor = conn.cursor(dictionary=True)

# Get all order details with order information
cursor.execute("""
    SELECT 
        od.id as order_detail_id,
        o.id as order_id,
        od.product_id,
        od.quantity,
        od.unit_price,
        od.discount,
        od.status_id,
        DATE_FORMAT(o.order_date, '%Y-%m-%d') as order_date,
        o.customer_id,
        o.employee_id,
        o.shipper_id
    FROM order_details od
    JOIN orders o ON od.order_id = o.id
    ORDER BY o.order_date
""")

all_records = cursor.fetchall()
cursor.close()
conn.close()

print(f" Extracted {len(all_records)} order detail records from MySQL")

# Split into 3 batches for streaming simulation
total = len(all_records)
batch1 = all_records[:total//3]
batch2 = all_records[total//3:(total//3)*2]
batch3 = all_records[(total//3)*2:]

# Create streaming_data directory if it doesn't exist
os.makedirs(streaming_data_dir, exist_ok=True)

# Write each batch to a JSON file with Decimal conversion
for i, batch in enumerate([batch1, batch2, batch3], 1):
    file_path = os.path.join(streaming_data_dir, f'order_details_batch{i}.json')
    with open(file_path, 'w') as f:
        json.dump(batch, f, indent=2, default=decimal_to_float)
    print(f" Created batch{i} with {len(batch)} records")

print(f"\n All streaming data files created in: {streaming_data_dir}")
print(f"Files: {os.listdir(streaming_data_dir)}")

Creating streaming data files from MySQL...
✓ Extracted 58 order detail records from MySQL
✓ Created batch1 with 19 records
✓ Created batch2 with 19 records
✓ Created batch3 with 20 records

✓ All streaming data files created in: c:\Users\sebas\ds2002\final_project\streaming_data
Files: ['order_details_batch1.json', 'order_details_batch2.json', 'order_details_batch3.json', 'test.json']


In [None]:
# Convert JSON files from array format to newline-delimited format
# Spark streaming requires one JSON object per line

for i in range(1, 4):
    file_path = os.path.join(streaming_data_dir, f'order_details_batch{i}.json')
    
    # Check if file exists
    if not os.path.exists(file_path):
        print(f"  File not found: {file_path}")
        continue
    
    # Read array-format file
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    # Overwrite with newline-delimited format
    with open(file_path, 'w') as f:
        for record in data:
            f.write(json.dumps(record) + '\n')
    
    print(f" Converted batch{i} ({len(data)} records)")

print("\n All files converted to newline-delimited JSON!")

✓ Converted batch1 (19 records)
✓ Converted batch2 (19 records)
✓ Converted batch3 (20 records)

✓ All files converted to newline-delimited JSON!


#### 3.0. Configure Bronze Streaming Ingestion

In [None]:
# Read streaming data from JSON files (AutoLoader pattern)
bronze_stream = spark.readStream \
    .format("json") \
    .schema(order_details_schema) \
    .option("maxFilesPerTrigger", 1) \
    .load(streaming_data_dir + "/order_details_*.json")

# Add metadata columns
bronze_stream = bronze_stream \
    .withColumn("ingestion_timestamp", current_timestamp()) \
    .withColumn("source_file", input_file_name())

print(" Bronze stream configured")
print("Schema:")
bronze_stream.printSchema()

✓ Bronze stream configured
Schema:
root
 |-- order_detail_id: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: double (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- status_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- employee_id: integer (nullable = true)
 |-- shipper_id: integer (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)
 |-- source_file: string (nullable = false)



#### 4.0. Write Bronze Stream to Parquet

In [None]:
# Write bronze stream to Parquet format
bronze_query = bronze_stream.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("checkpointLocation", f"{orders_output_bronze}_checkpoint") \
    .start(orders_output_bronze)

print(" Bronze stream started - processing JSON batches...")
print("Waiting for batches to process...")

# Wait for all 3 batches to process
wait_until_stream_is_ready(bronze_query, min_batches=3)

# Stop the bronze stream
bronze_query.stop()
print(" Bronze layer complete!")

✓ Bronze stream started - processing JSON batches...
Waiting for batches to process...
Stream has processed 3 batches
✓ Bronze layer complete!


#### 5.0. Verify Bronze Data

In [None]:
# Verify bronze data
bronze_df = spark.read.format("parquet").load(orders_output_bronze)
print(f" Bronze table contains {bronze_df.count()} order detail records")
bronze_df.show(10)

✓ Bronze table contains 58 order detail records
+---------------+--------+----------+--------+----------+--------+---------+----------+-----------+-----------+----------+--------------------+--------------------+
|order_detail_id|order_id|product_id|quantity|unit_price|discount|status_id|order_date|customer_id|employee_id|shipper_id| ingestion_timestamp|         source_file|
+---------------+--------+----------+--------+----------+--------+---------+----------+-----------+-----------+----------+--------------------+--------------------+
|             27|      30|        34|   100.0|      14.0|     0.0|        2|2006-01-15|         27|          9|         2|2025-12-15 19:30:...|file:///c:/Users/...|
|             28|      30|        80|    30.0|       3.5|     0.0|        2|2006-01-15|         27|          9|         2|2025-12-15 19:30:...|file:///c:/Users/...|
|             29|      31|         7|    10.0|      30.0|     0.0|        2|2006-01-20|          4|          3|         1|2025-

### Silver Layer: Join Streaming Data with Dimensions

In [None]:
# Read from bronze as a stream
silver_stream = spark.readStream \
    .format("parquet") \
    .load(orders_output_bronze)

# Convert order_date to proper date type
silver_stream = silver_stream.withColumn("order_date", to_date(col("order_date")))

# Join with dim_customers (MongoDB) - NO CITY COLUMN
silver_stream = silver_stream \
    .join(dim_customers.select("customer_key", "customer_name", "country"), 
          silver_stream.customer_id == dim_customers.customer_key, "left") \
    .withColumnRenamed("country", "customer_country")

# Join with dim_products (MongoDB)
silver_stream = silver_stream \
    .join(dim_products.select("product_key", "product_name", col("unit_price").alias("list_price")), 
          silver_stream.product_id == dim_products.product_key, "left")

# Join with dim_employees (MySQL)
silver_stream = silver_stream \
    .join(
        dim_employees.select(
            col("id").alias("emp_id"),
            concat(col("first_name"), lit(" "), col("last_name")).alias("employee_name")
        ), 
        silver_stream.employee_id == col("emp_id"), 
        "left"
    ) \
    .drop("emp_id")

# Join with dim_shippers (MySQL)
silver_stream = silver_stream \
    .join(
        dim_shippers.select(
            col("id").alias("ship_id"),
            col("company").alias("shipper_name")
        ), 
        silver_stream.shipper_id == col("ship_id"), 
        "left"
    ) \
    .drop("ship_id")

# Calculate line_total
silver_stream = silver_stream.withColumn(
    "line_total", 
    col("quantity") * col("unit_price") * (1 - col("discount"))
)

print(" Silver stream configured with dimension joins")
silver_stream.printSchema()

✓ Silver stream configured with dimension joins
root
 |-- order_detail_id: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: double (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- status_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- employee_id: integer (nullable = true)
 |-- shipper_id: integer (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = true)
 |-- source_file: string (nullable = true)
 |-- customer_key: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- customer_country: string (nullable = true)
 |-- product_key: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- list_price: double (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- shipper_name: string (nullable = true)
 |-- line_total: double (n

#### Write Silver Stream to Parquet

In [None]:
# Write silver stream to Parquet
silver_query = silver_stream.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("checkpointLocation", f"{orders_output_silver}_checkpoint") \
    .start(orders_output_silver)

print(" Silver stream started - joining with dimensions...")
print("Waiting for processing...")

# Wait for processing
wait_until_stream_is_ready(silver_query, min_batches=3)

# Stop the silver stream
silver_query.stop()
print(" Silver layer complete!")

✓ Silver stream started - joining with dimensions...
Waiting for processing...
Stream has processed 3 batches
✓ Silver layer complete!


#### Verify Silver Data

In [None]:
# Verify silver data
silver_df = spark.read.format("parquet").load(orders_output_silver)
print(f" Silver table contains {silver_df.count()} enriched records")
silver_df.show(10)

✓ Silver table contains 58 enriched records
+---------------+--------+----------+--------+----------+--------+---------+----------+-----------+-----------+----------+--------------------+--------------------+------------+-------------+----------------+-----------+--------------------+----------+-------------------+------------------+----------+
|order_detail_id|order_id|product_id|quantity|unit_price|discount|status_id|order_date|customer_id|employee_id|shipper_id| ingestion_timestamp|         source_file|customer_key|customer_name|customer_country|product_key|        product_name|list_price|      employee_name|      shipper_name|line_total|
+---------------+--------+----------+--------+----------+--------+---------+----------+-----------+-----------+----------+--------------------+--------------------+------------+-------------+----------------+-----------+--------------------+----------+-------------------+------------------+----------+
|             79|      69|        80|    15.0| 

In [None]:
# Show specific enriched columns
silver_df.select("order_id", "customer_name", "customer_country", "product_name", "employee_name", "shipper_name", "line_total").show(10)

+--------+-------------+----------------+--------------------+-------------------+------------------+----------+
|order_id|customer_name|customer_country|        product_name|      employee_name|      shipper_name|line_total|
+--------+-------------+----------------+--------------------+-------------------+------------------+----------+
|      69|    Company J|             USA|Northwind Traders...|    Nancy Freehafer|Shipping Company A|      52.5|
|      42|    Company J|             USA|Northwind Traders...|    Nancy Freehafer|Shipping Company A|     250.0|
|      42|    Company J|             USA|Northwind Traders...|    Nancy Freehafer|Shipping Company A|      92.0|
|      42|    Company J|             USA|Northwind Traders...|    Nancy Freehafer|Shipping Company A|     220.0|
|      31|    Company D|             USA|Northwind Traders...|          Jan Kotas|Shipping Company A|      35.0|
|      58|    Company D|             USA|Northwind Traders...|          Jan Kotas|Shipping Compa

### Gold Layer: Final Fact Table

In [None]:
# Read silver data (batch mode for final transformation)
gold_df = spark.read.format("parquet").load(orders_output_silver)

# Select final fact table columns
fact_orders = gold_df.select(
    col("order_detail_id").alias("fact_key"),
    col("order_id"),
    col("order_date"),
    col("customer_key"),
    col("customer_name"),
    col("customer_country"),
    col("product_key"),
    col("product_name"),
    col("employee_id").alias("employee_key"),
    col("employee_name"),
    col("shipper_id").alias("shipper_key"),
    col("shipper_name"),
    col("quantity"),
    col("unit_price"),
    col("discount"),
    col("line_total")
)

# Write to Gold layer as Parquet
fact_orders.write.format("parquet").mode("overwrite").save(orders_output_gold)

print(" Gold layer complete!")
print(f" Final fact_orders table contains {fact_orders.count()} records")
fact_orders.show(10)

✓ Gold layer complete!
✓ Final fact_orders table contains 58 records
+--------+--------+----------+------------+-------------+----------------+-----------+--------------------+------------+-------------------+-----------+------------------+--------+----------+--------+----------+
|fact_key|order_id|order_date|customer_key|customer_name|customer_country|product_key|        product_name|employee_key|      employee_name|shipper_key|      shipper_name|quantity|unit_price|discount|line_total|
+--------+--------+----------+------------+-------------+----------------+-----------+--------------------+------------+-------------------+-----------+------------------+--------+----------+--------+----------+
|      79|      69|2006-05-24|          10|    Company J|             USA|         80|Northwind Traders...|           1|    Nancy Freehafer|          1|Shipping Company A|    15.0|       3.5|     0.0|      52.5|
|      43|      42|2006-03-24|          10|    Company J|             USA|         

## Section IV: Business Analytics Queries

### 1.0. Total Sales by Country

In [None]:
# Load gold fact table
fact_orders_df = spark.read.format("parquet").load(orders_output_gold)

# Total sales by country
sales_by_country = fact_orders_df.groupBy("customer_country") \
    .agg(sum("line_total").alias("total_sales")) \
    .orderBy(desc("total_sales"))

print("Total Sales by Country:")
sales_by_country.show()

Total Sales by Country:
+----------------+-----------+
|customer_country|total_sales|
+----------------+-----------+
|             USA|    68137.0|
+----------------+-----------+



### 2.0. Top Performing Employees

In [None]:
# Top performing employees by revenue
top_employees = fact_orders_df.groupBy("employee_name") \
    .agg(
        sum("line_total").alias("total_revenue"),
        count("*").alias("num_orders")
    ) \
    .orderBy(desc("total_revenue"))

print("Top Performing Employees:")
top_employees.show()

Top Performing Employees:
+-------------------+-------------+----------+
|      employee_name|total_revenue|num_orders|
+-------------------+-------------+----------+
|    Nancy Freehafer|     22255.25|        17|
|Anne Hellung-Larsen|     19974.25|        10|
|    Michael Neipper|       6378.0|         4|
|   Mariya Sergienko|       6278.0|        10|
|          Jan Kotas|       5787.5|         7|
|        Robert Zare|       3786.5|         3|
|     Andrew Cencini|       2997.5|         6|
|     Laura Giussani|        680.0|         1|
+-------------------+-------------+----------+



### 3.0. Product Performance Analysis

In [None]:
# Top 10 products by revenue
top_products = fact_orders_df.groupBy("product_name") \
    .agg(
        sum("line_total").alias("total_revenue"),
        sum("quantity").alias("total_quantity")
    ) \
    .orderBy(desc("total_revenue")) \
    .limit(10)

print("Top 10 Products by Revenue:")
top_products.show()

Top 10 Products by Revenue:
+--------------------+-------------+--------------+
|        product_name|total_revenue|total_quantity|
+--------------------+-------------+--------------+
|Northwind Traders...|      29900.0|         650.0|
|Northwind Traders...|       6818.0|         487.0|
|Northwind Traders...|       3240.0|          40.0|
|Northwind Traders...|       3132.0|          90.0|
|Northwind Traders...|       2798.5|         290.0|
|Northwind Traders...|       2600.0|          65.0|
|Northwind Traders...|       2550.0|         200.0|
|Northwind Traders...|       2500.0|         100.0|
|Northwind Traders...|       2208.0|         120.0|
|Northwind Traders...|       2120.0|          40.0|
+--------------------+-------------+--------------+



### 4.0. Monthly Sales Trends

In [None]:
# Monthly sales trends
monthly_sales = fact_orders_df \
    .withColumn("year", year("order_date")) \
    .withColumn("month", month("order_date")) \
    .groupBy("year", "month") \
    .agg(
        sum("line_total").alias("total_sales"),
        count("*").alias("num_orders")
    ) \
    .orderBy("year", "month")

print("Monthly Sales Trends:")
monthly_sales.show()

Monthly Sales Trends:
+----+-----+-----------+----------+
|year|month|total_sales|num_orders|
+----+-----+-----------+----------+
|2006|    1|     3836.0|         8|
|2006|    2|     2241.5|         3|
|2006|    3|   32609.25|        13|
|2006|    4|   19355.25|        21|
|2006|    5|     1788.5|         4|
|2006|    6|     8306.5|         9|
+----+-----+-----------+----------+



### 5.0. Customer Analysis

In [None]:
# Top 10 customers by total spending
top_customers = fact_orders_df.groupBy("customer_name", "customer_city", "customer_country") \
    .agg(
        sum("line_total").alias("total_spent"),
        count("order_id").alias("num_orders")
    ) \
    .orderBy(desc("total_spent")) \
    .limit(10)

print("Top 10 Customers by Total Spending:")
top_customers.show()

NameError: name 'fact_orders_df' is not defined

## Section V: Project Summary

In [None]:
print("="*80)
print("DS-2002 Project 2: Northwind Data Lakehouse - COMPLETED")
print("="*80)
print("\n✓ Multi-Source Data Integration:")
print(f"  • MongoDB Atlas (NoSQL): {dim_customers.count()} customers, {dim_products.count()} products")
print(f"  • MySQL (Relational): {dim_employees.count()} employees, {dim_shippers.count()} shippers")
print(f"  • File System (Streaming): {bronze_df.count()} order details from 3 JSON batches")
print("\n✓ Medallion Architecture Implemented:")
print(f"  • Bronze Layer: {bronze_df.count()} raw records")
print(f"  • Silver Layer: {silver_df.count()} enriched records with dimension joins")
print(f"  • Gold Layer: {fact_orders.count()} final fact table records")
print("\n✓ Technologies Used:")
print("  • PySpark Structured Streaming")
print("  • Parquet Storage Format")
print("  • MongoDB Spark Connector")
print("  • MySQL JDBC Driver")
print("\n✓ Analytics Queries: 5 business intelligence queries executed")
print("="*80)