In [2]:
# Step 1: Read Delta tables from Silver Lakehouse
product_df = spark.read.format("delta").load("Tables/product_delta")
region_df = spark.read.format("delta").load("Tables/region_delta")
sales_df = spark.read.format("delta").load("Tables/sales_delta")

# Step 2: Register as temp views
product_df.createOrReplaceTempView("product")
region_df.createOrReplaceTempView("region")
sales_df.createOrReplaceTempView("sales")

# Step 3: Save raw data into Gold Lakehouse
product_df.write.format("delta").mode("overwrite").save("Files/gold_lake_house/product")
region_df.write.format("delta").mode("overwrite").save("Files/gold_lake_house/region")
sales_df.write.format("delta").mode("overwrite").save("Files/gold_lake_house/sales")

# Step 4: SQL-based simple aggregation: Total Sales per Product
agg1 = spark.sql("""
    SELECT 
        p.product_key,
        p.product_name,
        SUM(s.sales_amount) AS total_sales
    FROM sales s
    JOIN product p ON s.product_key = p.product_key
    GROUP BY p.product_key, p.product_name
""")

# Step 5: Save aggregation as Delta table in Gold Lakehouse
agg1.write.format("delta").mode("overwrite").save("Files/gold_lake_house/agg_total_sales_per_product")

# Another Example: Total Sales per Region
agg2 = spark.sql("""
    SELECT 
        r.region,
        r.country,
        SUM(s.sales_amount) AS total_sales_region
    FROM sales s
    JOIN region r ON s.sales_territory_key = r.sales_territory_key
    GROUP BY r.region, r.country
""")

agg2.write.format("delta").mode("overwrite").save("Files/gold_lake_house/agg_total_sales_per_region")



StatementMeta(, 91baeeae-095d-4eed-ac6a-3716ee69b736, 4, Finished, Available, Finished)