In [0]:
spark.sql("USE silverdb")
spark.sql("""
CREATE TABLE IF NOT EXISTS silver_orders (
    transaction_id STRING,
    customer_id STRING,
    product_id STRING,
    quantity INT,
    total_amount DOUBLE,
    transaction_date DATE,
    payment_method STRING,
    store_type STRING,
    order_status STRING,
    last_updated TIMESTAMP
)
USING DELTA
""")

DataFrame[]

In [0]:
# Get the last processed timestamp from silver layer
last_processed_df = spark.sql("SELECT MAX(last_updated) as last_processed FROM silver_orders")
last_processed_timestamp = last_processed_df.collect()[0]['last_processed']

if last_processed_timestamp is None:
    last_processed_timestamp = "1900-01-01T00:00:00.000+00:00"

In [0]:
# Create a temporary view of incremental bronze data
spark.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW bronze_incremental_orders AS
SELECT *
FROM ravi_db_project.bronzedb.bronze_transaction WHERE ingestion_timestamp > '{last_processed_timestamp}'
""")

DataFrame[]

In [0]:
spark.sql("select * from bronze_incremental_orders").display()

transaction_id,customer_id,product_id,quantity,total_amount,transaction_date,payment_method,store_type,ingestion_timestamp
TRX000063,234,67,2,550.83,2021-09-12T00:00:00Z,Bank Transfer,Online,2025-01-08T20:47:50.22Z
TRX000115,58,475,2,299.56,2022-07-31T00:00:00Z,Bank Transfer,Online,2025-01-08T20:47:50.22Z
TRX000126,29,609,2,706.21,2021-12-02T00:00:00Z,Bank Transfer,Online,2025-01-08T20:47:50.22Z
TRX000144,122,202,2,446.44,2022-09-24T00:00:00Z,Bank Transfer,Online,2025-01-08T20:47:50.22Z
TRX000311,378,719,2,945.18,2020-02-19T00:00:00Z,Bank Transfer,Online,2025-01-08T20:47:50.22Z
TRX000326,638,602,2,687.12,2021-07-10T00:00:00Z,Bank Transfer,Online,2025-01-08T20:47:50.22Z
TRX000544,665,392,2,719.01,2023-01-13T00:00:00Z,Bank Transfer,Online,2025-01-08T20:47:50.22Z
TRX000598,815,672,2,90.97,2022-01-18T00:00:00Z,Bank Transfer,Online,2025-01-08T20:47:50.22Z
TRX000632,538,787,2,131.37,2022-06-10T00:00:00Z,Bank Transfer,Online,2025-01-08T20:47:50.22Z
TRX000723,257,242,2,296.8,2021-08-30T00:00:00Z,Bank Transfer,Online,2025-01-08T20:47:50.22Z


In [0]:
#Data Transformations:
#Quantity and total_amount normalization (setting negative values to 0)
#Date casting to ensure consistent date format
#Order status derivation based on quantity and total_amount
#Data Quality Checks: We filter out records with null transaction dates, customer IDs, or product IDs.


In [0]:
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW silver_incremental_orders AS
SELECT
    transaction_id,
    customer_id,
    product_id,
    CASE        -----qauality checks
        WHEN quantity < 0 THEN 0 
        ELSE quantity 
    END AS quantity,
    CASE          ---- total amount normalization 
        WHEN total_amount < 0 THEN 0 
        ELSE total_amount 
    END AS total_amount,
    CAST(transaction_date AS DATE) AS transaction_date,
    payment_method,
    store_type,
    CASE         -------order status derivation
        WHEN quantity = 0 OR total_amount = 0 THEN 'Cancelled'
        ELSE 'Completed'
    END AS order_status,
    CURRENT_TIMESTAMP() AS last_updated
FROM bronze_incremental_orders
WHERE transaction_date IS NOT NULL
  AND customer_id IS NOT NULL
  AND product_id IS NOT NULL
""")

DataFrame[]

In [0]:
spark.sql("""
MERGE INTO silver_orders target
USING silver_incremental_orders source
ON target.transaction_id = source.transaction_id
WHEN MATCHED THEN
    UPDATE SET *
WHEN NOT MATCHED THEN
    INSERT *
""")

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
select * from silver_orders

transaction_id,customer_id,product_id,quantity,total_amount,transaction_date,payment_method,store_type,order_status,last_updated
TRX000063,234,67,2,550.83,2021-09-12,Bank Transfer,Online,Completed,2025-01-08T22:47:34.291Z
TRX000115,58,475,2,299.56,2022-07-31,Bank Transfer,Online,Completed,2025-01-08T22:47:34.291Z
TRX000126,29,609,2,706.21,2021-12-02,Bank Transfer,Online,Completed,2025-01-08T22:47:34.291Z
TRX000144,122,202,2,446.44,2022-09-24,Bank Transfer,Online,Completed,2025-01-08T22:47:34.291Z
TRX000311,378,719,2,945.18,2020-02-19,Bank Transfer,Online,Completed,2025-01-08T22:47:34.291Z
TRX000326,638,602,2,687.12,2021-07-10,Bank Transfer,Online,Completed,2025-01-08T22:47:34.291Z
TRX000544,665,392,2,719.01,2023-01-13,Bank Transfer,Online,Completed,2025-01-08T22:47:34.291Z
TRX000598,815,672,2,90.97,2022-01-18,Bank Transfer,Online,Completed,2025-01-08T22:47:34.291Z
TRX000632,538,787,2,131.37,2022-06-10,Bank Transfer,Online,Completed,2025-01-08T22:47:34.291Z
TRX000723,257,242,2,296.8,2021-08-30,Bank Transfer,Online,Completed,2025-01-08T22:47:34.291Z
