<a href="https://colab.research.google.com/github/moinshaikh6872/Modern-Azure-Data-Pipeline-On-Premises-to-Delta-Lake-Analytics/blob/main/Silver_to_Gold_PySpark_Aggregation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Databricks Notebook: Silver to Gold Aggregation and Modeling

# This notebook reads the cleaned data from the Silver layer, applies business logic
# (aggregation, modeling), and writes the final analytical table to the Gold Delta Lake layer.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, year, month, lit, current_timestamp

# 1. Configuration Setup
# Base paths for the Medallion Layers
BASE_PATH = "/mnt/adlssynapse/"
SILVER_PATH = f"{BASE_PATH}silver/sales_data_cleaned" # Input from Bronze-to-Silver step
GOLD_PATH = f"{BASE_PATH}gold/daily_sales_summary"     # Final output path for BI reporting

# Define the aggregation column (This should match the standardized date column from the Silver step)
DATE_COL = "Standardized_ModifiedDate"
SALES_AMOUNT_COL = "TotalAmount" # Assuming a column with sales value exists in the raw data

# 2. Read Cleaned Data from Silver Layer (Delta Format)
try:
    print(f"Reading Delta table from Silver Layer: {SILVER_PATH}")

    # Reading Delta format for ACID compliance and schema enforcement benefits
    df_silver = spark.read.format("delta").load(SILVER_PATH)

    if df_silver.count() == 0:
        print("Error: Silver layer is empty. Cannot proceed to Gold transformation.")
        raise ValueError("Silver layer data count is zero.")

    print(f"Successfully loaded {df_silver.count()} cleaned records.")

except Exception as e:
    print(f"Error reading Silver Delta data: {e}")
    raise

# 3. Apply Transformation Logic (Aggregation and Modeling)
# Business Logic: Create a daily sales summary for the Gold layer.
# Aggregations include total sales amount and the number of transactions per day.

df_gold_summary = df_silver.groupBy(DATE_COL).agg(
    # Calculate total sales amount for the day
    sum(col(SALES_AMOUNT_COL)).alias("Daily_Revenue"),

    # Calculate the number of distinct transactions
    count(col("SalesOrderID")).alias("Total_Transactions")
)

# 4. Add Dimensional/Metadata Columns (Best Practice for Gold)
df_gold = df_gold_summary.withColumn("Sales_Year", year(col(DATE_COL))) \
                         .withColumn("Sales_Month", month(col(DATE_COL))) \
                         .withColumn("Processed_Timestamp", current_timestamp()) \
                         .withColumn("Data_Owner", lit("Moin Shaikh")) # Example metadata column

# 5. Write Data to Gold Layer (Delta Lake Format)
print(f"Writing aggregated data to Gold Layer: {GOLD_PATH}")

# Writing mode: 'overwrite' is used for full daily refreshes; 'merge' (UPSERT) is common for dimension tables.
df_gold.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(GOLD_PATH)

print("Silver to Gold transformation completed successfully. Gold table ready for Synapse.")

# 6. Verification (Optional)
# df_check = spark.read.format("delta").load(GOLD_PATH)
# print("Schema of Gold Layer Data:")
# df_check.printSchema()
# print(f"First 5 rows of Gold Layer Data:")
# df_check.show(5)