<a href="https://colab.research.google.com/github/shrishaameenaa-cmd/Data_Processing_Challenge/blob/main/23BCS160_Incremental_Data_Processing_Challenge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark




In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when
import pandas as pd
import time


In [3]:
spark = SparkSession.builder.appName("CDC_Simulation").getOrCreate()


In [4]:
initial_data = pd.DataFrame({
    "customer_id": [1, 2, 3],
    "balance": [1000, 1500, 1200],
    "status": ["active", "active", "inactive"]
})

df_base = spark.createDataFrame(initial_data)
df_base.show()


+-----------+-------+--------+
|customer_id|balance|  status|
+-----------+-------+--------+
|          1|   1000|  active|
|          2|   1500|  active|
|          3|   1200|inactive|
+-----------+-------+--------+



In [5]:
cdc_events = pd.DataFrame({
    "customer_id": [2, 4],
    "balance": [1800, 900],
    "status": ["active", "active"]
})

df_cdc = spark.createDataFrame(cdc_events)
df_cdc.show()


+-----------+-------+------+
|customer_id|balance|status|
+-----------+-------+------+
|          2|   1800|active|
|          4|    900|active|
+-----------+-------+------+



In [6]:
# Merge changes incrementally
df_merged = (
    df_base.alias("base")
    .join(df_cdc.alias("cdc"), col("base.customer_id") == col("cdc.customer_id"), "outer")
    .select(
        when(col("cdc.customer_id").isNotNull(), col("cdc.customer_id")).otherwise(col("base.customer_id")).alias("customer_id"),
        when(col("cdc.balance").isNotNull(), col("cdc.balance")).otherwise(col("base.balance")).alias("balance"),
        when(col("cdc.status").isNotNull(), col("cdc.status")).otherwise(col("base.status")).alias("status")
    )
)

df_merged.show()


+-----------+-------+--------+
|customer_id|balance|  status|
+-----------+-------+--------+
|          1|   1000|  active|
|          2|   1800|  active|
|          3|   1200|inactive|
|          4|    900|  active|
+-----------+-------+--------+



In [7]:
df_final = df_merged.withColumn("risk_score", col("balance") / 100)
df_final.show()


+-----------+-------+--------+----------+
|customer_id|balance|  status|risk_score|
+-----------+-------+--------+----------+
|          1|   1000|  active|      10.0|
|          2|   1800|  active|      18.0|
|          3|   1200|inactive|      12.0|
|          4|    900|  active|       9.0|
+-----------+-------+--------+----------+



In [8]:
for batch in range(3):
    print(f"=== Batch {batch+1} ===")
    # New incremental change each loop
    new_change = pd.DataFrame({
        "customer_id": [batch + 5],
        "balance": [1000 + batch * 200],
        "status": ["active"]
    })
    df_cdc_new = spark.createDataFrame(new_change)
    df_merged = (
        df_final.alias("base")
        .join(df_cdc_new.alias("cdc"), col("base.customer_id") == col("cdc.customer_id"), "outer")
        .select(
            when(col("cdc.customer_id").isNotNull(), col("cdc.customer_id")).otherwise(col("base.customer_id")).alias("customer_id"),
            when(col("cdc.balance").isNotNull(), col("cdc.balance")).otherwise(col("base.balance")).alias("balance"),
            when(col("cdc.status").isNotNull(), col("cdc.status")).otherwise(col("base.status")).alias("status")
        )
    )
    df_final = df_merged.withColumn("risk_score", col("balance") / 100)
    df_final.show()
    time.sleep(2)


=== Batch 1 ===
+-----------+-------+--------+----------+
|customer_id|balance|  status|risk_score|
+-----------+-------+--------+----------+
|          1|   1000|  active|      10.0|
|          2|   1800|  active|      18.0|
|          3|   1200|inactive|      12.0|
|          4|    900|  active|       9.0|
|          5|   1000|  active|      10.0|
+-----------+-------+--------+----------+

=== Batch 2 ===
+-----------+-------+--------+----------+
|customer_id|balance|  status|risk_score|
+-----------+-------+--------+----------+
|          1|   1000|  active|      10.0|
|          2|   1800|  active|      18.0|
|          3|   1200|inactive|      12.0|
|          4|    900|  active|       9.0|
|          5|   1000|  active|      10.0|
|          6|   1200|  active|      12.0|
+-----------+-------+--------+----------+

=== Batch 3 ===
+-----------+-------+--------+----------+
|customer_id|balance|  status|risk_score|
+-----------+-------+--------+----------+
|          1|   1000|  act