In [0]:
%run ../01-Config/02-config

In [0]:
# Create the silver layer table(dimension)
query = """CREATE TABLE IF NOT EXISTS ECOMMERCE_DB.SILVER_DB.products (
    product_id STRING,
    name STRING,
    category STRING,
    brand STRING,
    price DOUBLE,
    stock_quantity INT,
    rating DOUBLE,
    is_active BOOLEAN,
    price_category STRING,
    stock_status STRING,
    last_updated TIMESTAMP
)"""

spark._jvm.net.snowflake.spark.snowflake.Utils.runQuery(snowflake_config, query)

In [0]:
last_processed_df = spark.sql("SELECT MAX(last_updated) as last_processed FROM silver_products")
last_processed_timestamp = last_processed_df.collect()[0]['last_processed']

if last_processed_timestamp is None:
    last_processed_timestamp = "1900-01-01T00:00:00.000+00:00"

In [0]:
customer_query = "SELECT MAX(last_updated) as last_processed FROM ECOMMERCE_DB.SILVER_DB.PRODUCTS"
last_processed_df = spark.read.format("snowflake") \
    .options(**snowflake_config) \
    .option("query", customer_query) \
    .load()
# Display the schema to check the column names
last_processed_df.printSchema()

# Check if the DataFrame is empty
if last_processed_df.count() == 0:
    last_processed_timestamp = "1900-01-01T00:00:00.000+00:00"
else:
    last_processed_row = last_processed_df.collect()[0]
    last_processed_timestamp = last_processed_row['LAST_PROCESSED'] if 'LAST_PROCESSED' in last_processed_row else None
    if last_processed_timestamp is None:
        last_processed_timestamp = "1900-01-01T00:00:00.000+00:00"

In [0]:
query = f"""
SELECT *
FROM ECOMMERCE_DB.BRONZE.products c 
WHERE c.ingestion_timestamp > '{last_processed_timestamp}'
"""
product_view = spark.read.format("snowflake") \
    .options(**snowflake_config) \
    .option("query", query) \
    .load() 

In [0]:
product_view.createOrReplaceTempView("bronze_product_incremental")
spark.sql("select * from bronze_product_incremental").show()

Data Transformations:
   - Price normalization (setting negative prices to 0)
   - Stock quantity normalization (setting negative stock to 0)
   - Rating normalization (clamping between 0 and 5)
   - Price categorization (Premium, Standard, Budget)
   - Stock status calculation (Out of Stock, Low Stock, Moderate Stock, Sufficient Stock)


In [0]:
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW silver_incremental_products AS
SELECT
    product_id,
    name,
    category,
    brand,
    CASE
        WHEN price < 0 THEN 0
        ELSE price
    END AS price,
    CASE
        WHEN stock_quantity < 0 THEN 0
        ELSE stock_quantity
    END AS stock_quantity,
    CASE
        WHEN rating < 0 THEN 0
        WHEN rating > 5 THEN 5
        ELSE rating
    END AS rating,
    is_active,
    CASE
        WHEN price > 1000 THEN 'Premium'
        WHEN price > 100 THEN 'Standard'
        ELSE 'Budget'
    END AS price_category,
    CASE
        WHEN stock_quantity = 0 THEN 'Out of Stock'
        WHEN stock_quantity < 10 THEN 'Low Stock'
        WHEN stock_quantity < 50 THEN 'Moderate Stock'
        ELSE 'Sufficient Stock'
    END AS stock_status,
    CURRENT_TIMESTAMP() AS last_updated
FROM bronze_product_incremental
WHERE name IS NOT NULL AND category IS NOT NULL
""")

In [0]:
#%sql
#select * from silver_incremental_products


customer_query = "SELECT * FROM ECOMMERCE_DB.SILVER_DB.PRODUCTS"
target_df = spark.read.format("snowflake") \
    .options(**snowflake_config) \
    .option("query", customer_query) \
    .load()

display(target_df)

source_cust = spark.sql("select * from silver_incremental_products")
upd_cust = target_df.join(source_cust, "product_id", "inner").select(source_cust["*"])
new_cust = target_df.join(source_cust, "product_id", "right").select(source_cust["*"])

display(upd_cust)
display(new_cust)


In [0]:
#wite data staging tables to silver schema in snowflake

new_cust.write \
    .format("snowflake") \
    .options(**snowflake_config) \
    .option("dbtable", 'products') \
    .option("sfDatabase", 'ecommerce_db') \
    .option("sfSchema", 'silver_db') \
    .mode("append") \
    .save()

upd_cust.write \
    .format("snowflake") \
    .options(**snowflake_config) \
    .option("dbtable", 'products') \
    .option("sfDatabase", 'ecommerce_db') \
    .option("sfSchema", 'silver_db') \
    .mode("overwrite") \
    .save()


