In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# 1. Load Components (Excel raw file from Bronze folder)
components_bronze = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("Files/bronze/Components_Extended.csv")

# 2. Load Product BOM (Excel raw file from Bronze folder)
bom_bronze = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("Files/bronze/ProductBOM_Extended.csv")

print("Components (Bronze):")
components_bronze.show(5)

print("Product BOM (Bronze):")
bom_bronze.show(5)

components_bronze.write.format("delta").mode("overwrite").saveAsTable("bronze_components")
bom_bronze.write.format("delta").mode("overwrite").saveAsTable("bronze_bom")


StatementMeta(, 76a144c7-1edd-4e1b-8341-ca45077f12ff, 10, Finished, Available, Finished)

Components (Bronze):
+-----------+-------------+--------+---------+
|ComponentID|ComponentName|UnitCost| Supplier|
+-----------+-------------+--------+---------+
|       C001|  Component_1|    2.61|SupplierD|
|       C002|  Component_2|   13.34|SupplierD|
|       C003|  Component_3|    65.5|SupplierB|
|       C004|  Component_4|   32.83|SupplierB|
|       C005|  Component_5|    86.7|SupplierB|
+-----------+-------------+--------+---------+
only showing top 5 rows

Product BOM (Bronze):
+---------+-----------+-----------+--------+-------+----------+
|ProductID|ProductName|ComponentID|Quantity|Variant|IsOptional|
+---------+-----------+-----------+--------+-------+----------+
|     P013|Widget_P013|       C011|       1|   Lite|         1|
|     P013|Widget_P013|       C050|       3|    Std|         0|
|     P002|Widget_P002|       C032|       0|    Std|         0|
|     P019|Widget_P019|       C023|       4|   Lite|         0|
|     P011|Widget_P011|       C033|       0|   Lite|         

In [10]:
from pyspark.sql.functions import when, col, round

# 1. Join BOM with Components (left join keeps all BOM rows)
joined = bom_bronze.join(
    components_bronze,
    on="ComponentID",
    how="left"
)

# 2. Flag missing components
joined = joined.withColumn("MissingFlag", when(col("UnitCost").isNull(), 1).otherwise(0))

# 3. Replace null UnitCost with 0 for safe calculations
joined = joined.withColumn("UnitCost", when(col("UnitCost").isNull(), 0.0).otherwise(col("UnitCost")))

# 4. RowCost = Quantity * UnitCost
joined = joined.withColumn("RowCost", round(col("Quantity") * col("UnitCost"), 2))

# 5. Apply Variant discount (Lite = 10% discount → 0.9 multiplier)
joined = joined.withColumn("VariantMultiplier", when(col("Variant") == "Lite", 0.9).otherwise(1.0))
joined = joined.withColumn("VariantRowCost", round(col("RowCost") * col("VariantMultiplier"), 2))

# 6. OptionalFlag = 1 if Quantity=0 or IsOptional=1
joined = joined.withColumn("OptionalFlag",
    when((col("IsOptional") == 1) | (col("Quantity") == 0), 1).otherwise(0)
)

joined.write.format("delta").mode("overwrite").saveAsTable("silver_bom")


StatementMeta(, 76a144c7-1edd-4e1b-8341-ca45077f12ff, 12, Finished, Available, Finished)

In [5]:
# Load Silver table
silver = spark.read.table("silver_bom")

# Save as CSV (with header)
silver.coalesce(1).write \
    .option("header", True) \
    .mode("overwrite") \
    .csv("Files/silver/silver_bom_csv")

StatementMeta(, 4168da64-47f3-463f-af3e-1906b99a7fb7, 7, Finished, Available, Finished)

In [13]:
silver = spark.read.table("silver_bom")
silver.show(10)


StatementMeta(, 76a144c7-1edd-4e1b-8341-ca45077f12ff, 15, Finished, Available, Finished)

+-----------+---------+-----------+--------+-------+----------+-------------+--------+---------+-----------+-------+-----------------+--------------+------------+
|ComponentID|ProductID|ProductName|Quantity|Variant|IsOptional|ComponentName|UnitCost| Supplier|MissingFlag|RowCost|VariantMultiplier|VariantRowCost|OptionalFlag|
+-----------+---------+-----------+--------+-------+----------+-------------+--------+---------+-----------+-------+-----------------+--------------+------------+
|       C011|     P013|Widget_P013|       1|   Lite|         1| Component_11|   54.69|SupplierB|          0|  54.69|              0.9|         49.22|           1|
|       C050|     P013|Widget_P013|       3|    Std|         0| Component_50|   94.31|SupplierD|          0| 282.93|              1.0|        282.93|           0|
|       C032|     P002|Widget_P002|       0|    Std|         0| Component_32|   12.51|SupplierA|          0|    0.0|              1.0|           0.0|           1|
|       C023|     P019

In [2]:
from pyspark.sql.functions import sum as _sum, count, col

# Load Silver
silver = spark.read.table("silver_bom")

# Aggregate to product-level (Gold)
gold = silver.groupBy("ProductID", "ProductName", "Variant") \
    .agg(
        _sum("VariantRowCost").alias("TotalCost"),
        _sum("MissingFlag").alias("MissingCount"),
        _sum("OptionalFlag").alias("OptionalCount"),
        _sum("Quantity").alias("TotalQuantity")
    )

# Save as Gold table
gold.write.format("delta").mode("overwrite").saveAsTable("gold_product_summary")


StatementMeta(, 4168da64-47f3-463f-af3e-1906b99a7fb7, 4, Finished, Available, Finished)

In [3]:
df = spark.sql("SELECT * FROM ProductBOM_Lakehouse.gold_product_summary LIMIT 1000")
display(df)

StatementMeta(, 4168da64-47f3-463f-af3e-1906b99a7fb7, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f53a1bfe-c7e8-4c3d-9d77-6af6b88e3e2b)

In [4]:
# Load Gold table
gold = spark.read.table("gold_product_summary")

# Save as CSV (with header)
gold.coalesce(1).write \
    .option("header", True) \
    .mode("overwrite") \
    .csv("Files/gold/gold_product_summary_csv")


StatementMeta(, 4168da64-47f3-463f-af3e-1906b99a7fb7, 6, Finished, Available, Finished)

In [1]:
gold = spark.read.table("gold_product_summary")
gold.coalesce(1).write.option("header", True).mode("overwrite").csv("Files/gold_export_csv")

StatementMeta(, 5eeea6ec-db54-4c57-b961-56a54cedd120, 3, Finished, Available, Finished)