In [0]:
#Create a raw delta table with CDC enabled
spark.sql("""
CREATE TABLE IF NOT EXISTS test_gds.default.raw_upi_transactions_v1
( transaction_id STRING,
  upi_id STRING,
  merchant_id STRING,
  transaction_amount DOUBLE,
  transaction_timestamp TIMESTAMP,
  transaction_status STRING
)
USING delta
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
""")
print("Delta table 'incremental_load.default.raw_upi_transactions_v1' created with CDC enabled.")


In [0]:
from delta import *

#create mock data in form of batches

mock_data = [
  spark.createDataFrame([
    ("T001", "upi1@bank", "M001", 500.0, "2024-12-21 10:00:00", "initiated"),
    ("T002", "upi2@bank", "M002", 1000.0, "2024-12-21 10:05:00", "initiated"),
    ("T003", "upi3@bank", "M003", 1500.0, "2024-12-21 10:10:00", "initiated"),
  ], ["transaction_id", "upi_id", "merchant_id", "transaction_amount", "transaction_timestamp", "transaction_status"]),
  spark.createDataFrame([
    ("T001", "upi1@bank", "M001", 500.0, "2024-12-21 10:15:00", "completed"),  # Update 
    ("T002", "upi2@bank", "M002", 1000.0, "2024-12-21 10:20:00", "failed"),    # Update 
    ("T004", "upi4@bank", "M004", 2000.0, "2024-12-21 10:25:00", "initiated"), # New 
  ], ["transaction_id", "upi_id", "merchant_id", "transaction_amount", "transaction_timestamp", "transaction_status"]),
  spark.createDataFrame([
    ("T001", "upi1@bank", "M001", 500.0, "2024-12-21 10:30:00", "refunded"),  # Refund issued
    ("T003", "upi3@bank", "M003", 1500.0, "2024-12-21 10:35:00", "completed"), # Completed 
  ], ["transaction_id", "upi_id", "merchant_id", "transaction_amount", "transaction_timestamp", "transaction_status"]),
]


#Merge Logic
def merge_to_delta_table(batch_data: str, batch_no):
    #create a object of the delta table  for merging
    delta_table = DeltaTable.forName(spark, batch_data)

    #Merge logic
    delta_table.alias("target").merge(batch_no.alias("source"), 
    "target.transaction_id = source.transaction_id")\
    .whenMatchedUpdate(
        set = {"upi_id": "source.upi_id",
                "merchant_id": "source.merchant_id",
                "transaction_amount": "source.transaction_amount",
                "transaction_timestamp": "source.transaction_timestamp",
                "transaction_status": "source.transaction_status"
                }
    )\
    .whenNotMatchedInsertAll().execute()

merge_to_delta_table("test_gds.default.raw_upi_transactions_v1",mock_data[2] )
print(f"Batch processed successfully.")