In [0]:
dbutils.widgets.text("source_path", "/Volumes/workspace/ecommerce", "Source Path")
dbutils.widgets.dropdown("layer", "bronze", ["bronze","silver","gold"], "Layer")

# Read parameter values
source_path = dbutils.widgets.get("source_path")
layer = dbutils.widgets.get("layer")

print(f"Running ETL for layer: {layer} using source path: {source_path}")

from pyspark.sql.functions import current_timestamp, col, to_date, sum, count, countDistinct

def run_layer(layer_name, source_path):
    
    if layer_name == "bronze":
        print("Executing Bronze Layer...")
        
        # Create Bronze volume if not exists (SQL cell)
        spark.sql("CREATE VOLUME IF NOT EXISTS workspace.ecommerce.bronze")
        
        # Read raw CSV data
        oct_df = spark.read.csv(
            f"{source_path}/ecommerce_data/2019-Oct.csv",
            header=True,
            inferSchema=True
        )
        
        # Add ingestion timestamp
        bronze_df = oct_df.withColumn("ingestion_time", current_timestamp())
        
        # Bronze Delta path
        bronze_path = f"{source_path}/bronze/ecommerce_events"
        
        # Write Bronze data
        bronze_df.write.format("delta").mode("overwrite").save(bronze_path)
        
        # Sanity checks
        print("Bronze row count:", bronze_df.count())
        display(bronze_df.limit(10))
        bronze_df.printSchema()
    
    elif layer_name == "silver":
        print("Executing Silver Layer...")
        
        # Create Silver volume
        spark.sql("CREATE VOLUME IF NOT EXISTS workspace.ecommerce.silver")
        
        # Read Bronze data
        bronze_df = spark.read.format("delta").load(f"{source_path}/bronze/ecommerce_events")
        
        # Clean and validate
        silver_df = (
            bronze_df
            .filter(col("user_id").isNotNull())
            .filter(col("event_type").isin("view", "cart", "purchase"))
            .filter((col("price").isNull()) | (col("price") >= 0))
            .dropDuplicates()
        )
        
        # Silver Delta path
        silver_path = f"{source_path}/silver/ecommerce_events_clean"
        
        # Write Silver data
        silver_df.write.format("delta").mode("overwrite").save(silver_path)
        
        # Checks
        print("Bronze rows:", bronze_df.count())
        print("Silver rows:", silver_df.count())
        print("Null user_id count:", silver_df.filter(col("user_id").isNull()).count())
        silver_df.groupBy("event_type").count().show()
        print("Negative price count:", silver_df.filter(col("price") < 0).count())
        display(silver_df.limit(10))
    
    elif layer_name == "gold":
        print("Executing Gold Layer...")
        
        # Create Gold volume
        spark.sql("CREATE VOLUME IF NOT EXISTS workspace.ecommerce.gold")
        
        # Read Silver data
        silver_df = spark.read.format("delta").load(f"{source_path}/silver/ecommerce_events_clean")
        
        # Aggregates for analytics
        gold_df = (
            silver_df
            .filter(col("event_type") == "purchase")
            .withColumn("event_date", to_date("event_time"))
            .groupBy("event_date")
            .agg(
                sum("price").alias("total_revenue"),
                count("*").alias("total_orders"),
                countDistinct("user_id").alias("unique_customers")
            )
        )
        
        # Gold Delta path
        gold_path = f"{source_path}/gold/daily_sales_metrics"
        
        # Write Gold data
        gold_df.write.format("delta").mode("overwrite").save(gold_path)
        
        # Checks
        display(gold_df.orderBy("event_date").limit(10))
        print("Duplicate dates:", gold_df.count() - gold_df.select("event_date").distinct().count())
        gold_df.select(
            "event_date",
            "total_revenue",
            "total_orders",
            "unique_customers"
        ).summary().show()
    
    else:
        raise ValueError(f"Unknown layer: {layer_name}")

# Execute the layers
run_layer(layer, source_path)


## **Instructions for Databricks Jobs UI:**

Job Name: ETL_Bronze_Silver_Gold
**
Tasks:**
1. bronze_layer
   - Notebook: this notebook
   - Parameters: layer=bronze, source_path=/Volumes/workspace/ecommerce
2. silver_layer
   - Notebook: this notebook
   - Parameters: layer=silver, source_path=/Volumes/workspace/ecommerce
   - Dependency: bronze_layer
3. gold_layer
   - Notebook: this notebook
   - Parameters: layer=gold, source_path=/Volumes/workspace/ecommerce
   - Dependency: silver_layer


**Schedule the Job**

- In the Job UI, after creating the job, click Add trigger under schedule and triggers section in the panel on the right.
- Choose the trigger type as scheduled, schedule type as advanced to set the timeline of when the job must be executed.