## Create a Databricks notebook to load the Silver layer: Product delta table

In [0]:
spark.sql("USE globalretail_silver")
spark.sql("""
    CREATE TABLE IF NOT EXISTS silver_products(
      product_id STRING,
      name STRING,
      category STRING,
      brand STRING,
      price DOUBLE,
      stock_quantity INT,
      rating DOUBLE,
      is_active BOOLEAN,
      price_grocery STRING,
      stock_status STRING,     
      last_updated TIMESTAMP
    )
USING DELTA
""")

In [0]:
# Get the most recent last_updated timestamp from the silver_products table
last_processed_df = spark.sql("SELECT MAX(last_updated) AS last_processed FROM silver_products")
last_processed_timestamp = last_processed_df.collect()[0]['last_processed']

# If no records exist, set a default old timestamp
if last_processed_timestamp is None:
  last_processed_timestamp = "1900-01-01T00:00:00.000+0000"

In [0]:
# Create a temporary view of incremental data
spark.sql(f"""
    CREATE OR REPLACE TEMPORARY VIEW bronze_incremental_products AS
    SELECT * 
    FROM globalretail_bronze.bronze_products p
    WHERE p.ingestion_timestamp >= '{last_processed_timestamp}'
""")

- **Data Transformation:**
  - Normalize price (set negative prices to 0)
  - Normalize stock quantity (set negative stock values to 0)
  - Clamp rating values between 0 and 5
  - Categorize price (Premium, Standard, Budget)
  - Determine stock status (Out of Stock, Low Stock, Moderate Stock, Sufficient Stock)

In [0]:
# Transform data
spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW silver_incremental_products AS
    SELECT
      product_id,
      name,
      category,
      brand,
      CASE 
        WHEN price < 0 THEN 0
        ELSE price 
      END AS price,
      CASE 
        WHEN stock_quantity < 0 THEN 0
        ELSE stock_quantity 
      END AS stock_quantity,
      CASE 
        WHEN rating < 0 THEN 0
        ELSE rating 
      END AS rating,
      is_active,
      CASE 
        WHEN price > 1000 THEN 'Premium'
        WHEN price > 100 THEN 'Standard'
        ELSE 'Budget'
      END AS price_grocery,
      CASE 
        WHEN stock_quantity = 0 THEN 'Out of Stock'
        WHEN stock_quantity < 10 THEN 'Low Stock'
        WHEN stock_quantity < 50 THEN 'Moderate Stock'
        ELSE 'Sufficient Stock'
      END AS stock_status,
      current_timestamp() AS last_updated
    FROM bronze_incremental_products
    WHERE name IS NOT NULL AND category IS NOT NULL
""")

In [0]:
spark.sql("""
    MERGE INTO silver_products t
    USING silver_incremental_products s
    ON t.product_id = s.product_id
    WHEN MATCHED THEN
      UPDATE SET *
    WHEN NOT MATCHED THEN
      INSERT *
""")

In [0]:
%sql
select * from globalretail_silver.silver_products