In [0]:
%sql
DROP TABLE IF EXISTS stg_product; -- staging table to store 
CREATE TABLE IF NOT EXISTS stg_product (
  sku INT NOT NULL,
  product_name STRING,
  product_type STRING,
  description STRING,
  price DOUBLE
) USING DELTA;


### General constraint check

In [0]:
%sql
SELECT *
FROM silver.cleansed_sales
WHERE 
  product_type IS NULL
  OR total_revenue < 0                        
  OR sale_date IS NULL             

In [0]:
%sql
SELECT * 
FROM silver.cleansed_sales

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW viewProduct
AS
(
  SELECT
    sku,
    name,
    product_type,
    description,
    price
  FROM silver.cleansed_sales
)

In [0]:
spark.sql("INSERT INTO stg_product SELECT * FROM viewProduct")

### Create the `dim_product` table

In [0]:
%sql
CREATE TABLE IF NOT EXISTS gold.dim_product (
  sku INT NOT NULL,
  product_name STRING,
  product_type STRING,
  description STRING,
  price DOUBLE,
  starting_date DATE NOT NULL,
  ending_date DATE,
  activate BOOLEAN NOT NULL
) USING DELTA;


In [0]:
%sql
DROP TABLE IF EXISTS gold.fact_daily_sales;

CREATE TABLE IF NOT EXISTS gold.fact_daily_sales (
  sale_date DATE,
  sku INT NOT NULL,
  transaction_price DOUBLE,
  sold INT, 
  total_revenue DOUBLE
) USING DELTA;

In [0]:
# df = spark.read.table("silver.cleansed_sales")

# # Show the data
# df.show()

In [0]:
%sql
SELECT COUNT(*)
FROM gold.dim_product

In [0]:
processing_sale_date_str = '2025-05-19' # First run for our example

# Stage 1: Prepare current day's product attributes from silver.cleansed_sales
# We need a consistent view of product attributes for a SKU on a given sale_date.
# If a SKU appears multiple times, we pick one (e.g., based on highest price, or assume they are consistent).
query_stage_silver_products = f"""
        CREATE OR REPLACE TEMPORARY VIEW StagedSilverProductAttributes AS
        WITH RankedSilver AS (
            SELECT
                sku,
                name AS product_name, 
                product_type,         
                price,                -- transaction price in that particular day
                description,          
                sale_date,
                1 as rn -- row number (assume records are not duplicated)
            FROM silver.cleansed_sales
            WHERE sale_date = DATE'{processing_sale_date_str}'
        )
        SELECT
            sku,
            product_name,
            product_type,
            price,      -- representative price for the dim_product
            description,
            sale_date AS starting_date -- effective date from
        FROM RankedSilver
        WHERE rn = 1;

        """
spark.sql(query_stage_silver_products)
print("StagedSilverProductAttributes created for the current processing date.")


# Stage 2: MERGE into Gold.dim_product to handle SCD Type 2
# This MERGE will expire old records if attributes change.
query_dim_product_scd2_merge = f"""
        MERGE INTO gold.dim_product AS tgt
        USING (
            SELECT
                s.sku,
                s.product_name, 
                s.product_type,
                s.price,
                s.description,
                s.starting_date
            FROM StagedSilverProductAttributes s
        ) AS src
        ON tgt.sku = src.sku AND tgt.activate = TRUE

        -- SKU exists and is activated. Check if attributes changed.
        WHEN MATCHED AND (
            tgt.product_name <> src.product_name OR
            tgt.product_type <> src.product_type OR
            tgt.price <> src.price OR              -- Assuming transaction price from silver drives dim price
            tgt.description <> src.description
        )
        THEN UPDATE SET
            tgt.activate = FALSE,
            tgt.ending_date = DATE_SUB(src.starting_date, 1); -- Expire previous day
        """
spark.sql(query_dim_product_scd2_merge)
print("Dim_Product MERGE (for expiring old versions) completed.")


# Stage 3: INSERT new products OR new versions of changed products into Gold.Dim_Product
query_dim_product_scd2_insert = f"""
        INSERT INTO gold.dim_product (
            sku, product_name, product_type, price, description,
            starting_date, ending_date, activate
        )
        SELECT
            s.sku,
            s.product_name,
            s.product_type,
            s.price,
            s.description,
            s.starting_date,
            DATE'9999-12-31' AS ending_date, -- just random invalid date as a placeholder
            TRUE AS activate
        FROM StagedSilverProductAttributes s
        LEFT JOIN gold.dim_product existing_current_dim
            ON s.sku = existing_current_dim.sku AND existing_current_dim.activate = TRUE
        WHERE
            existing_current_dim.sku IS NULL -- new to Dim_Product
                                            -- or its previous current record was just expired by the MERGE,
                                            -- so no 'activate = TRUE' record exists for it now.
        ;
        """
spark.sql(query_dim_product_scd2_insert)
print("Dim_Product INSERT (for new/updated versions) completed.")


# Stage 4: Load Gold.Fact_Daily_Sales

# delete records (if there's any)
spark.sql(f"DELETE FROM gold.fact_daily_sales WHERE sale_date = DATE'{processing_sale_date_str}'")
print(f"Deleted existing fact records for {processing_sale_date_str} (if any).")

query_fact_sales_insert = f"""
        INSERT INTO gold.fact_daily_sales (
            sale_date, sku, transaction_price, sold, total_revenue
        )
        SELECT
            s.sale_date,
            s.sku, 
            s.price AS transaction_price,
            s.sold AS amount_sold,
            s.total_revenue
        FROM silver.cleansed_sales s
        JOIN gold.dim_product dp
            ON s.sku = dp.sku
            AND s.sale_date >= dp.starting_date  -- Product version was active on or before sale_date
            AND s.sale_date <= dp.ending_date    -- Product version was active on or before sale_date
        WHERE s.sale_date = DATE'{processing_sale_date_str}'
        """
spark.sql(query_fact_sales_insert)
print("Fact_Daily_Sales INSERT completed.")

# Clean up temp view
spark.sql("DROP VIEW StagedSilverProductAttributes")
print("Temp view dropped.")

In [0]:
%sql 
SELECT (*)
FROM gold.fact_daily_sales