In [0]:
# Loads employee dimension data into a Delta table.
# "At least 3 additional dimension tables" 
# Use files from a cloud-based file system (DBFS) as a source.
# Load employee dimension data from a CSV stored in DBFS
df_dim_employees = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("dbfs:/Volumes/workspace/default/northwind_schema/Northwind_DimEmployees.csv")

df_dim_employees.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("default.dim_employees")


In [0]:
# View the loaded employee dimension table to confirm contents
spark.sql("SELECT * FROM default.dim_employees").show()

+------------+-----------------+--------------+----------+--------------------+--------------------+--------------+-------------+-------------+--------------+--------+--------------+---------------+--------------+--------------------+
|employee_key|          company|     last_name|first_name|       email_address|           job_title|business_phone|   home_phone|   fax_number|       address|    city|state_province|zip_postal_code|country_region|            web_page|
+------------+-----------------+--------------+----------+--------------------+--------------------+--------------+-------------+-------------+--------------+--------+--------------+---------------+--------------+--------------------+
|           1|Northwind Traders|     Freehafer|     Nancy|nancy@northwindtr...|Sales Representative| (123)555-0100|(123)555-0102|(123)555-0103|123 1st Avenue| Seattle|            WA|          99999|           USA|#http://northwind...|
|           2|Northwind Traders|       Cencini|    Andrew|an

In [0]:
# Create a Department dimension 
# "At least 3 additional dimension tables"
from pyspark.sql import Row
departments = [
    Row(department_id=1, department_name="HR"),
    Row(department_id=2, department_name="Engineering"),
    Row(department_id=3, department_name="Sales")
]
df_dim_departments = spark.createDataFrame(departments)
df_dim_departments.show()

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            1|             HR|
|            2|    Engineering|
|            3|          Sales|
+-------------+---------------+



In [0]:
# Saves the Department dimension to Delta format and populates the Lakehouse architecture with structured dimension tables
df_dim_departments.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.dim_departments")

In [0]:
# Create a Fact Sales table (simulated transactional data) to meet the "At least 1 fact table that models the business process" requirement - Important for OLAP systems for tracking measurable events

from pyspark.sql import Row
sales = [
    Row(sale_id=1001, employee_id=1, department_id=1, amount=500.0),
    Row(sale_id=1002, employee_id=2, department_id=2, amount=1200.0),
    Row(sale_id=1003, employee_id=3, department_id=3, amount=300.0)
]
df_fact_sales = spark.createDataFrame(sales)
df_fact_sales.show()

+-------+-----------+-------------+------+
|sale_id|employee_id|department_id|amount|
+-------+-----------+-------------+------+
|   1001|          1|            1| 500.0|
|   1002|          2|            2|1200.0|
|   1003|          3|            3| 300.0|
+-------+-----------+-------------+------+



In [0]:
# Save fact_sales to Delta fact table

df_fact_sales.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("default.fact_sales")

In [0]:
# Ingest purchase order fact from JSON into Bronze layer

df_fact_purchase_orders = spark.read.option("multiline", "true") \
    .json("dbfs:/Volumes/workspace/default/northwind_schema/Northwind_Fact_PurchaseOrders01.json")
df_fact_purchase_orders.write.format("delta").mode("overwrite").save("dbfs:/Volumes/workspace/default/northwind_schema/bronze/Northwind_Fact_PurchaseOrders01")

In [0]:

# "At least 3 additional dimension tables"
# Defines product dimension table (Dimensional Table 3)

df_dim_products = spark.createDataFrame([
    Row(product_id=1001, product_name="Laptop", price=1000.0),
    Row(product_id=1002, product_name="Mouse", price=25.0),
    Row(product_id=1003, product_name="Keyboard", price=45.0)
])
df_dim_products.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.dim_products")

In [0]:
# Defines customer dimension table (Dimensional Table 4)

from pyspark.sql import Row
df_dim_customers = spark.createDataFrame([
    Row(customer_id=101, name="John Doe", region="East"),
    Row(customer_id=102, name="Jane Smith", region="West"),
    Row(customer_id=103, name="Carlos Rivera", region="South")
])
df_dim_customers.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.dim_customers")

In [0]:
# Generate a Date Dimension with derived fields for project requirement 

from pyspark.sql.functions import col, to_date, monotonically_increasing_id
from pyspark.sql import functions as F
date_range = spark.range(0, 365).withColumn("date", F.expr("date_add('2023-01-01', cast(id as int))"))
df_dim_date = date_range.select(
    col("date").alias("date"),
    F.dayofmonth("date").alias("day"),
    F.month("date").alias("month"),
    F.year("date").alias("year"),
    F.date_format("date", "EEEE").alias("day_name"),
    F.date_format("date", "MMMM").alias("month_name"),
    F.weekofyear("date").alias("week"),
    F.quarter("date").alias("quarter")
)
df_dim_date.show(5)
df_dim_date.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.dim_date")

+----------+---+-----+----+---------+----------+----+-------+
|      date|day|month|year| day_name|month_name|week|quarter|
+----------+---+-----+----+---------+----------+----+-------+
|2023-01-01|  1|    1|2023|   Sunday|   January|  52|      1|
|2023-01-02|  2|    1|2023|   Monday|   January|   1|      1|
|2023-01-03|  3|    1|2023|  Tuesday|   January|   1|      1|
|2023-01-04|  4|    1|2023|Wednesday|   January|   1|      1|
|2023-01-05|  5|    1|2023| Thursday|   January|   1|      1|
+----------+---+-----+----+---------+----------+----+-------+
only showing top 5 rows


In [0]:
spark.sql("DROP TABLE IF EXISTS default.fact_sales")

DataFrame[]

In [0]:
df_fact_sales.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.fact_sales")

In [0]:
# Create and load the fact_sales table to model the core business process (sales transactions)
# Fact table requirement 
# Adds a derived column (`total_amount`) as part of the Transform step in ETL
# Uses in-memory data (simulating a streaming or transactional source)
# Saves result to Delta Lake as part of the Bronze/Silver pipeline
from pyspark.sql import Row
sales_data = [
    Row(sale_id=1, employee_id=1, customer_id=101, product_id=1001, date="2023-01-01", quantity=2, unit_price=10.0),
    Row(sale_id=2, employee_id=2, customer_id=102, product_id=1002, date="2023-01-02", quantity=1, unit_price=20.0),
    Row(sale_id=3, employee_id=3, customer_id=103, product_id=1003, date="2023-01-03", quantity=5, unit_price=5.0)
]
df_fact_sales = spark.createDataFrame(sales_data)
df_fact_sales = df_fact_sales.withColumn("date", to_date("date"))
df_fact_sales = df_fact_sales.withColumn("total_amount", col("quantity") * col("unit_price"))
df_fact_sales.show()
df_fact_sales.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("default.fact_sales")

+-------+-----------+-----------+----------+----------+--------+----------+------------+
|sale_id|employee_id|customer_id|product_id|      date|quantity|unit_price|total_amount|
+-------+-----------+-----------+----------+----------+--------+----------+------------+
|      1|          1|        101|      1001|2023-01-01|       2|      10.0|        20.0|
|      2|          2|        102|      1002|2023-01-02|       1|      20.0|        20.0|
|      3|          3|        103|      1003|2023-01-03|       5|       5.0|        25.0|
+-------+-----------+-----------+----------+----------+--------+----------+------------+



In [0]:
# Silver Layer: Clean and sales data by joining with dimension tables

df_silver_sales = df_fact_sales \
    .join(df_dim_employees, df_fact_sales["employee_id"] == df_dim_employees["employee_key"]) \
    .join(df_dim_customers, "customer_id") \
    .join(df_dim_products, "product_id")



In [0]:
df_silver_sales = df_silver_sales.drop("employee_key")

In [0]:
from pyspark.sql.functions import concat_ws

# Uses 'name' for customers, and concatenates first/last name for employees
df_silver_sales_clean = df_silver_sales.select(
    "sale_id",
    "employee_id", concat_ws(" ", df_dim_employees["first_name"], df_dim_employees["last_name"]).alias("employee_name"),
    "customer_id", df_dim_customers["name"].alias("customer_name"),
    "product_id", "product_name",
    "date", "quantity", "unit_price", "total_amount"
)

# Saves Silver table
spark.sql("DROP TABLE IF EXISTS default.silver_sales")
df_silver_sales_clean.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.silver_sales")


In [0]:
# Gold Layer: Summarize metrics for business insight (OLAP)

from pyspark.sql.functions import sum
df_silver_sales = spark.table("default.silver_sales")
df_gold_sales_summary = df_silver_sales.groupBy("employee_name", "product_name") \
    .agg(
        sum("quantity").alias("total_quantity_sold"),
        sum("total_amount").alias("total_revenue")
    )
spark.sql("DROP TABLE IF EXISTS default.gold_sales_summary")
df_gold_sales_summary.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.gold_sales_summary")
df_gold_sales_summary.show()

+---------------+------------+-------------------+-------------+
|  employee_name|product_name|total_quantity_sold|total_revenue|
+---------------+------------+-------------------+-------------+
|      Jan Kotas|    Keyboard|                  5|         25.0|
| Andrew Cencini|       Mouse|                  1|         20.0|
|Nancy Freehafer|      Laptop|                  2|         20.0|
+---------------+------------+-------------------+-------------+



In [0]:
df_flat = spark.read.option("multiline", "true").json(
    "dbfs:/Volumes/workspace/default/northwind_schema/Northwind_Fact_PurchaseOrders01.json"
)

df_flat.printSchema()
df_flat.show(5, truncate=False)


root
 |-- approved_by: long (nullable = true)
 |-- approved_date_key: long (nullable = true)
 |-- created_by: long (nullable = true)
 |-- creation_date_key: long (nullable = true)
 |-- fact_purchase_order_key: long (nullable = true)
 |-- inventory_key: long (nullable = true)
 |-- payment_amount: long (nullable = true)
 |-- payment_date: string (nullable = true)
 |-- po_detail_date_received_key: long (nullable = true)
 |-- po_detail_posted_to_inventory: long (nullable = true)
 |-- po_detail_quantity: long (nullable = true)
 |-- po_detail_unit_cost: long (nullable = true)
 |-- product_key: long (nullable = true)
 |-- purchase_order_key: long (nullable = true)
 |-- purchase_order_status: string (nullable = true)
 |-- shipping_fee: long (nullable = true)
 |-- submitted_by: long (nullable = true)
 |-- submitted_date_key: long (nullable = true)
 |-- supplier_key: long (nullable = true)
 |-- taxes: long (nullable = true)

+-----------+-----------------+----------+-----------------+-----------

In [0]:
df_flat.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save("dbfs:/Volumes/workspace/default/northwind_schema/bronze_purchase_orders")

In [0]:
# Reads the Bronze Layer to start Silver transformation (ETL - Extract phase)

bronze = spark.read.format("delta").load("dbfs:/Volumes/workspace/default/northwind_schema/bronze_purchase_orders")
bronze.printSchema()
bronze.show(5, truncate=False)

root
 |-- PurchaseOrderID: integer (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- TotalDue: double (nullable = true)
 |-- approved_by: long (nullable = true)
 |-- approved_date_key: long (nullable = true)
 |-- created_by: long (nullable = true)
 |-- creation_date_key: long (nullable = true)
 |-- fact_purchase_order_key: long (nullable = true)
 |-- inventory_key: long (nullable = true)
 |-- payment_amount: long (nullable = true)
 |-- payment_date: string (nullable = true)
 |-- po_detail_date_received_key: long (nullable = true)
 |-- po_detail_posted_to_inventory: long (nullable = true)
 |-- po_detail_quantity: long (nullable = true)
 |-- po_detail_unit_cost: long (nullable = true)
 |-- product_key: long (nullable = true)
 |-- purchase_order_key: long (nullable = true)
 |-- purchase_order_status: string (nullable = true)
 |-- shipping_fee: long (nullable = true)
 |-- submitted_by: long (nul

In [0]:
df_flat = spark.read.option("multiline", "true").json(
    "dbfs:/Volumes/workspace/default/northwind_schema/Northwind_Fact_PurchaseOrders01.json"
)

df_flat.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save("dbfs:/Volumes/workspace/default/northwind_schema/bronze_purchase_orders")

bronze = spark.read.format("delta").load("dbfs:/Volumes/workspace/default/northwind_schema/bronze_purchase_orders")
bronze.show(5)

+---------------+----------+--------+---------+--------+-----------+-----------------+----------+-----------------+-----------------------+-------------+--------------+------------+---------------------------+-----------------------------+------------------+-------------------+-----------+------------------+---------------------+------------+------------+------------------+------------+-----+
|PurchaseOrderID|EmployeeID|VendorID|OrderDate|TotalDue|approved_by|approved_date_key|created_by|creation_date_key|fact_purchase_order_key|inventory_key|payment_amount|payment_date|po_detail_date_received_key|po_detail_posted_to_inventory|po_detail_quantity|po_detail_unit_cost|product_key|purchase_order_key|purchase_order_status|shipping_fee|submitted_by|submitted_date_key|supplier_key|taxes|
+---------------+----------+--------+---------+--------+-----------+-----------------+----------+-----------------+-----------------------+-------------+--------------+------------+---------------------------

In [0]:
# Silver Layer: Loads Bronze data for further transformation and integration with dimension tables

df_silver = spark.read.format("delta").load("dbfs:/Volumes/workspace/default/northwind_schema/bronze_purchase_orders")
df_silver.printSchema()
df_silver.show()

root
 |-- PurchaseOrderID: integer (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- TotalDue: double (nullable = true)
 |-- approved_by: long (nullable = true)
 |-- approved_date_key: long (nullable = true)
 |-- created_by: long (nullable = true)
 |-- creation_date_key: long (nullable = true)
 |-- fact_purchase_order_key: long (nullable = true)
 |-- inventory_key: long (nullable = true)
 |-- payment_amount: long (nullable = true)
 |-- payment_date: string (nullable = true)
 |-- po_detail_date_received_key: long (nullable = true)
 |-- po_detail_posted_to_inventory: long (nullable = true)
 |-- po_detail_quantity: long (nullable = true)
 |-- po_detail_unit_cost: long (nullable = true)
 |-- product_key: long (nullable = true)
 |-- purchase_order_key: long (nullable = true)
 |-- purchase_order_status: string (nullable = true)
 |-- shipping_fee: long (nullable = true)
 |-- submitted_by: long (nul

In [0]:
df_json_array = spark.read.option("multiline", "true").json("dbfs:/Volumes/workspace/default/northwind_schema/Northwind_Fact_PurchaseOrders01.json")
df_flat = df_json_array

In [0]:
# Saves Bronze Layer as a Delta table for SQL queries (ETL - Load step completion)
df_flat.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("default.bronze_purchase_orders")

df_flat.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("default.bronze_purchase_orders")

In [0]:
# Silver Layer: Joins raw purchase orders and supplier dimension table (ETL - Transform phase). To meet the join static reference data with fact table at Silver stage requirement

from pyspark.sql.functions import col

suppliers = spark.table("default.dim_suppliers").alias("suppliers")

df_silver_purchase = bronze.alias("bronze").join(
    suppliers,
    col("bronze.supplier_key") == col("suppliers.supplier_key"),
    "left"
)

df_silver_purchase = df_silver_purchase.drop(col("suppliers.supplier_key"))

df_silver_purchase.createOrReplaceTempView("silver_purchase_temp")

df_silver_purchase.select("supplier_key", "company").show(10, truncate=False)


+------------+----------+
|supplier_key|company   |
+------------+----------+
|1           |Supplier A|
|1           |Supplier A|
|1           |Supplier A|
|1           |Supplier A|
|1           |Supplier A|
|3           |Supplier C|
|3           |Supplier C|
|3           |Supplier C|
|3           |Supplier C|
|3           |Supplier C|
+------------+----------+
only showing top 10 rows


In [0]:
# Save cleaned Silver table without duplicating keys

df_silver_purchase_clean = df_silver_purchase.drop("supplier_key")
df_silver_purchase_clean.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("default.silver_purchase_orders")

In [0]:
# Needed import
from pyspark.sql.functions import expr, sum

In [0]:
# Load invoices from JSON file and saves to Delta table
df_invoices = spark.read.option("multiline", "true") \
    .json("dbfs:/Volumes/workspace/default/northwind_schema/Northwind_DimInvoices.json")

# Write the Delta table
df_invoices.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.dim_invoices")


In [0]:
from pyspark.sql.functions import col

df_orders = spark.table("default.silver_orders").alias("orders")
df_invoices = spark.table("default.dim_invoices").alias("invoices")

# Join orders and invoice data
df_order_enriched = df_orders.join(
    df_invoices,
    col("orders.order_key") == col("invoices.order_key"),
    "left"
)

df_order_enriched = df_order_enriched.drop(df_invoices["order_key"])

# Saves enriched data
df_order_enriched.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("default.silver_orders_with_invoices")


In [0]:
# Gold Layer: Aggregate total amount due per customer to meet the demonstrate business value through summarization requirement

from pyspark.sql.functions import sum

df_gold_customer_summary = df_order_enriched.groupBy("customer_key") \
    .agg(sum("amount_due").alias("total_amount_due"))

df_gold_customer_summary.show(truncate=False)

df_gold_customer_summary.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("default.gold_order_customer_summary")


+------------+----------------+
|customer_key|total_amount_due|
+------------+----------------+
|8           |0               |
|11          |0               |
|1           |0               |
|10          |0               |
|7           |NULL            |
|3           |0               |
|4           |0               |
|28          |0               |
|9           |0               |
|26          |0               |
|29          |0               |
|6           |0               |
|25          |0               |
+------------+----------------+



In [0]:
# Gold Layer: Aggregate quantities and costs by supplier to meet the demonstrate business value and vendor comparison requirement 

from pyspark.sql.functions import sum, expr

df_gold_supplier_summary = df_silver_purchase.groupBy("company").agg(
    sum("po_detail_quantity").alias("total_quantity_purchased"),
    sum(expr("po_detail_quantity * po_detail_unit_cost")).alias("total_cost")
)

df_gold_supplier_summary.show(truncate=False)

df_gold_supplier_summary.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("default.gold_supplier_summary")



+----------+------------------------+----------+
|company   |total_quantity_purchased|total_cost|
+----------+------------------------+----------+
|Supplier C|390                     |4800      |
|Supplier A|365                     |5370      |
|Supplier B|280                     |5960      |
+----------+------------------------+----------+



In [0]:
# Checks distinct keys in bronze data
bronze.select("supplier_key").distinct().show()

# Checks distinct keys in suppliers dim table
spark.table("default.dim_suppliers").select("supplier_key", "company").distinct().show()


+------------+
|supplier_key|
+------------+
|           2|
|           1|
|           3|
+------------+

+------------+----------+
|supplier_key|   company|
+------------+----------+
|           7|Supplier G|
|           3|Supplier C|
|           1|Supplier A|
|           2|Supplier B|
|           4|Supplier D|
|           6|Supplier F|
|          10|Supplier J|
|           9|Supplier I|
|           8|Supplier H|
|           5|Supplier E|
+------------+----------+



In [0]:
# Inspects key columns in Silver to verify data before Gold summarization

df_silver_purchase_clean.select("company", "po_detail_quantity", "po_detail_unit_cost", "TotalDue").show(5, truncate=False)


+----------+------------------+-------------------+--------+
|company   |po_detail_quantity|po_detail_unit_cost|TotalDue|
+----------+------------------+-------------------+--------+
|Supplier A|40                |14                 |NULL    |
|Supplier A|60                |10                 |NULL    |
|Supplier A|100               |34                 |NULL    |
|Supplier A|125               |2                  |NULL    |
|Supplier A|40                |14                 |NULL    |
+----------+------------------+-------------------+--------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import col

# Loads each FactOrders file
orders_2 = spark.read.option("multiline", "true").json("dbfs:/Volumes/workspace/default/northwind_schema/Northwind_FactOrders02.json")
orders_3 = spark.read.option("multiline", "true").json("dbfs:/Volumes/workspace/default/northwind_schema/Northwind_FactOrders03.json")

# Combines to one dataframee
df_orders = orders_2.unionByName(orders_3)

# Saves Silver Delta table
df_orders.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("default.silver_orders")


In [0]:
# Check join results for orders and invoices for Gold Layer
df_order_enriched.select(
    "order_key",
    "order_date_key",   
    "invoice_date",
    "amount_due"
).show(5, truncate=False)

+---------+--------------+-------------------+----------+
|order_key|order_date_key|invoice_date       |amount_due|
+---------+--------------+-------------------+----------+
|73       |20060605      |2006-04-04 11:38:32|0         |
|72       |20060607      |2006-04-04 11:38:53|0         |
|71       |20060524      |2006-04-04 11:39:29|0         |
|70       |20060524      |2006-04-04 11:39:53|0         |
|69       |20060524      |2006-04-04 11:40:16|0         |
+---------+--------------+-------------------+----------+
only showing top 5 rows
