In [1]:
import pandas as pd

df = pd.read_pickle('../data/raw/transaction/2025-01-31.pkl')

In [2]:
df.head()

Unnamed: 0,TRANSACTION_ID,TX_DATETIME,CUSTOMER_ID,TERMINAL_ID,TX_AMOUNT,TX_TIME_SECONDS,TX_TIME_DAYS,TX_FRAUD,TX_FRAUD_SCENARIO
2338640,2338640,2025-01-31 00:00:10,1199,502,17.91,21081610,244,0,0
2338641,2338641,2025-01-31 00:00:12,1949,3265,77.32,21081612,244,0,0
2338642,2338642,2025-01-31 00:00:45,1953,6481,72.42,21081645,244,0,0
2338643,2338643,2025-01-31 00:02:15,107,9548,23.26,21081735,244,0,0
2338644,2338644,2025-01-31 00:02:34,1177,8614,62.4,21081754,244,0,0


In [6]:
# filter thos rows where customerid is between 0-9 and temrinal_id is between 0-5
df_filtered = df[(df['CUSTOMER_ID'] >= 0) & (df['CUSTOMER_ID'] <= 9)]

In [7]:
df_filtered.head()

Unnamed: 0,TRANSACTION_ID,TX_DATETIME,CUSTOMER_ID,TERMINAL_ID,TX_AMOUNT,TX_TIME_SECONDS,TX_TIME_DAYS,TX_FRAUD,TX_FRAUD_SCENARIO
2339069,2339069,2025-01-31 03:15:46,7,4322,12.5,21093346,244,0,0
2339751,2339751,2025-01-31 05:42:44,1,943,63.94,21102164,244,0,0
2339921,2339921,2025-01-31 06:06:59,0,1530,24.17,21103619,244,0,0
2340643,2340643,2025-01-31 07:33:53,4,5251,22.42,21108833,244,0,0
2340731,2340731,2025-01-31 07:44:47,4,2225,79.2,21109487,244,0,0


In [8]:
df_filtered.to_dict(orient='records')

[{'TRANSACTION_ID': 2339069,
  'TX_DATETIME': Timestamp('2025-01-31 03:15:46'),
  'CUSTOMER_ID': 7,
  'TERMINAL_ID': 4322,
  'TX_AMOUNT': 12.5,
  'TX_TIME_SECONDS': 21093346,
  'TX_TIME_DAYS': 244,
  'TX_FRAUD': 0,
  'TX_FRAUD_SCENARIO': 0},
 {'TRANSACTION_ID': 2339751,
  'TX_DATETIME': Timestamp('2025-01-31 05:42:44'),
  'CUSTOMER_ID': 1,
  'TERMINAL_ID': 943,
  'TX_AMOUNT': 63.94,
  'TX_TIME_SECONDS': 21102164,
  'TX_TIME_DAYS': 244,
  'TX_FRAUD': 0,
  'TX_FRAUD_SCENARIO': 0},
 {'TRANSACTION_ID': 2339921,
  'TX_DATETIME': Timestamp('2025-01-31 06:06:59'),
  'CUSTOMER_ID': 0,
  'TERMINAL_ID': 1530,
  'TX_AMOUNT': 24.17,
  'TX_TIME_SECONDS': 21103619,
  'TX_TIME_DAYS': 244,
  'TX_FRAUD': 0,
  'TX_FRAUD_SCENARIO': 0},
 {'TRANSACTION_ID': 2340643,
  'TX_DATETIME': Timestamp('2025-01-31 07:33:53'),
  'CUSTOMER_ID': 4,
  'TERMINAL_ID': 5251,
  'TX_AMOUNT': 22.42,
  'TX_TIME_SECONDS': 21108833,
  'TX_TIME_DAYS': 244,
  'TX_FRAUD': 0,
  'TX_FRAUD_SCENARIO': 0},
 {'TRANSACTION_ID': 2340731,
 

# production code

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, StructType, StructField


In [None]:
## DEFINE SENSITIVE VARIABLES
CATALOG_URI = "http://localhost:19120/api/v1"  # Nessie Server URI
WAREHOUSE = "s3a://commerce/"  # S3 Address to Write to
STORAGE_URI = "http://localhost:9000"
KAFKA_BROKERS = 'localhost:9092'
S3_ENDPOINT = "http://localhost:9000"
CHECKPOINT_PATH = "s3a://commerce/checkpoints/debezium.payment.customers/"

# S3 Credentials
S3_ACCESS_KEY = "minio"
S3_SECRET_KEY = "minio123"

conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        # Packages
        .set('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,'
                                    'org.apache.hadoop:hadoop-aws:3.3.2,'
                                    'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,'
                                    'org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.77.1')
        # SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,'
                                     'org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
        # Configuring Catalog
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', CATALOG_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.nessie.warehouse', WAREHOUSE)
        .set('spark.sql.catalog.nessie.s3.secret.region', 'us-east-1')

        # S3 Configuration
        .set('spark.hadoop.fs.s3a.endpoint', S3_ENDPOINT)
        .set('spark.hadoop.fs.s3a.access.key', S3_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', S3_SECRET_KEY)
        .set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
        .set('spark.hadoop.fs.s3a.path.style.access', 'true')
        .set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false')  # Disable SSL if using MinIO locally
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")


your 131072x1 screen size is bogus. expect trouble
25/02/13 19:48:01 WARN Utils: Your hostname, GTMSI resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/02/13 19:48:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/starlord/.cache/pypoetry/virtualenvs/realtime-fraud-detection-system-jsIdL3x4-py3.11/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/starlord/.ivy2/cache
The jars for the packages stored in: /home/starlord/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-56ccfd03-fe2c-4b7d-8736-798cff42e55a;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache

Spark Running


In [None]:
# from pyspark.sql import SparkSession
# spark = (SparkSession.builder
#                 .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,org.apache.hadoop:hadoop-aws:3.3.2,,8")
#                 .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") 
#                 .config("spark.hadoop.fs.s3a.access.key", "minio") 
#                 .config("spark.hadoop.fs.s3a.secret.key", "minio123") 
#                 .config("spark.hadoop.fs.s3a.path.style.access", "true") 
#                 .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
#                 .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
#                 .config("spark.sql.catalog.spark_catalog.type", "hadoop")
#                 .config("spark.sql.catalog.spark_catalog.warehouse", "s3a://commerce/warehouse/") 
#                 .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") 
#                 .appName("Kafka to MinIO Pipeline")
#          .getOrCreate())

25/02/10 22:13:04 WARN Utils: Your hostname, Gauravs-MacBook-Air-3.local resolves to a loopback address: 127.0.0.1; using 192.168.1.10 instead (on interface en0)
25/02/10 22:13:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/saurabh/Library/Caches/pypoetry/virtualenvs/realtime-fraud-detection-system-eUiKKk2v-py3.13/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/saurabh/.ivy2/cache
The jars for the packages stored in: /Users/saurabh/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ba0a1697-3bdc-474f-b48a-19ff0897d01d;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in ce

In [3]:
def execute_query(query: str):
    spark.sql(query).show(truncate=False)

In [None]:
execute_query("""
  CREATE TABLE IF NOT EXISTS nessie.payment.customer (
      customer_id INT,
      x_location FLOAT,
      y_location FLOAT,
      row_created_timestamp TIMESTAMP,
      row_updated_timestamp TIMESTAMP
)
USING iceberg
""")

25/02/13 19:48:29 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


++
||
++
++



In [4]:
execute_query("show catalogs")

+-------------+
|catalog      |
+-------------+
|spark_catalog|
+-------------+



In [5]:
execute_query("show tables in nessie.default")

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [5]:
# Kafka configurations
kafka_brokers = "localhost:9093"  # Update with your Kafka broker address
kafka_topic = "debezium.payment.customers"
# Read data from Kafka (batch mode)
kafka_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

In [6]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
kafka_df.show()

25/02/13 19:49:07 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
25/02/13 19:49:07 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
25/02/13 19:49:07 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
25/02/13 19:49:07 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
25/02/13 19:49:07 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.
[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
|                 key|               value|               topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
|[7B 22 73 63 68 6...|[7B 22 73 63 68 6...|debezium.payment....|        0|     0|2025-02-13 19:31:...|            0|
|[7B 22 73 63 68 6...|[7B 22 73 63 68 6...|debezium.payment....|        0|     1|2025-02-13 19:31:...|            0|
|[7B 22 73 63 68 6...|[7B 22 73 63 68 6...|debezium.payment....|        0|     2|2025-02-13 19:31:...|            0|
|[7B 22 73 63 68 6...|[7B 22 73 63 68 6...|debezium.payment....|        0|     3|2025-02-13 19:31:...|            0|
|[7B 22 73 63 68 6...|[7B 22 73 63 68 6...|debezium.payment....|        0|     4|2025-02-13 19:31:...|            0|
|[7B 22 73 63 68 6...|[7B 22 73 63 68 6...|debezium.payment....|

                                                                                

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, LongType

# Define the schema for the JSON messages
column_schema = StructType([
StructField("schema", StringType(), False),
StructField("payload", StructType([
    StructField("before", StructType([
        StructField("customer_id", IntegerType(), False),
        StructField("x_location", DoubleType(), False),
        StructField("y_location", DoubleType(), False)
    ]), True),
    StructField("after", StructType([
        StructField("customer_id", IntegerType(), False),
        StructField("x_location", DoubleType(), False),
        StructField("y_location", DoubleType(), False)
    ]), True),
    StructField("source", StructType([
        StructField("version", StringType(), False),
        StructField("connector", StringType(), False),
        StructField("name", StringType(), False),
        StructField("ts_ms", LongType(), False),
        StructField("snapshot", StringType(), True),
        StructField("db", StringType(), False),
        StructField("sequence", StringType(), True),
        StructField("ts_us", LongType(), True),
        StructField("ts_ns", LongType(), True),
        StructField("schema", StringType(), False),
        StructField("table", StringType(), False),
        StructField("txId", LongType(), True),
        StructField("lsn", LongType(), True),
        StructField("xmin", LongType(), True)
    ]), False),
    StructField("transaction", StructType([
        StructField("id", StringType(), False),
        StructField("total_order", LongType(), False),
        StructField("data_collection_order", LongType(), False)
    ]), True),
    StructField("op", StringType(), False),
    StructField("ts_ms", LongType(), True),
    StructField("ts_us", LongType(), True),
    StructField("ts_ns", LongType(), True)
]))])

from pyspark.sql.functions import from_json

kafka_df = kafka_df.withColumn("value", col("value").cast(StringType())) \
    .withColumn("key", col("key").cast(StringType()))
# Apply the schema to the DataFrame
kafka_df = kafka_df.withColumn("value", from_json("value", column_schema))
kafka_df.createOrReplaceTempView("kafkadata")

: 

In [None]:
execute_query("select * from kafkadata")

25/02/13 19:44:21 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
25/02/13 19:44:21 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
25/02/13 19:44:21 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
25/02/13 19:44:21 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
25/02/13 19:44:21 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.


+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

: 

In [11]:
execute_query(""" create or replace temp view latest_data as 
              with extracted_data  as (
                    select value.payload.after.customer_id,
                    value.payload.after.x_location, 
                    value.payload.after.y_location,
                    timestamp
                    from kafkadata),
              ranked_data as (
                  select customer_id,
                  x_location,
                  y_location, 
                  timestamp,
                  ROW_NUMBER() over (partition by customer_id order by timestamp DESC) as rn
                  from extracted_data) 
              select customer_id,
                  x_location,
                  y_location,
                  now() as row_created_timestamp,
                  now() as row_updated_timestamp
              from ranked_data 
              where rn=1""")

++
||
++
++



In [12]:
execute_query("CREATE DATABASE IF NOT EXISTS nessie.payment;")
execute_query("show databases")

++
||
++
++

+---------+
|namespace|
+---------+
|default  |
+---------+



In [13]:
execute_query("""
        CREATE TABLE IF NOT EXISTS nessie.payment.customer (
      customer_id INT,
      x_location FLOAT,
      y_location FLOAT,
      row_created_timestamp TIMESTAMP,
      row_updated_timestamp TIMESTAMP
)
USING iceberg
""")

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


Py4JJavaError: An error occurred while calling o63.sql.
: software.amazon.awssdk.core.exception.SdkClientException: Unable to load region from any of the providers in the chain software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain@b4c2a86: [software.amazon.awssdk.regions.providers.SystemSettingsRegionProvider@1058bf35: Unable to load region from system settings. Region must be specified either via environment variable (AWS_REGION) or  system property (aws.region)., software.amazon.awssdk.regions.providers.AwsProfileRegionProvider@6989cf10: No region provided in profile: default, software.amazon.awssdk.regions.providers.InstanceProfileRegionProvider@54107b8e: Unable to contact EC2 metadata service.]
	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:111)
	at software.amazon.awssdk.regions.providers.AwsRegionProviderChain.getRegion(AwsRegionProviderChain.java:70)
	at software.amazon.awssdk.awscore.client.builder.AwsDefaultClientBuilder.resolveRegion(AwsDefaultClientBuilder.java:293)
	at software.amazon.awssdk.utils.AttributeMap$DerivedValue.primeCache(AttributeMap.java:600)
	at software.amazon.awssdk.utils.AttributeMap$DerivedValue.get(AttributeMap.java:589)
	at software.amazon.awssdk.utils.AttributeMap$Builder.resolveValue(AttributeMap.java:396)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at software.amazon.awssdk.utils.AttributeMap$Builder.build(AttributeMap.java:362)
	at software.amazon.awssdk.core.client.config.SdkClientConfiguration$Builder.build(SdkClientConfiguration.java:232)
	at software.amazon.awssdk.awscore.client.builder.AwsDefaultClientBuilder.finalizeAwsConfiguration(AwsDefaultClientBuilder.java:184)
	at software.amazon.awssdk.awscore.client.builder.AwsDefaultClientBuilder.finalizeChildConfiguration(AwsDefaultClientBuilder.java:161)
	at software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder.syncClientConfiguration(SdkDefaultClientBuilder.java:188)
	at software.amazon.awssdk.services.s3.DefaultS3ClientBuilder.buildClient(DefaultS3ClientBuilder.java:37)
	at software.amazon.awssdk.services.s3.DefaultS3ClientBuilder.buildClient(DefaultS3ClientBuilder.java:26)
	at software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder.build(SdkDefaultClientBuilder.java:155)
	at org.apache.iceberg.aws.AwsClientFactories$DefaultAwsClientFactory.s3(AwsClientFactories.java:116)
	at org.apache.iceberg.aws.s3.S3FileIO.client(S3FileIO.java:327)
	at org.apache.iceberg.aws.s3.S3FileIO.newOutputFile(S3FileIO.java:135)
	at org.apache.iceberg.BaseMetastoreTableOperations.writeNewMetadata(BaseMetastoreTableOperations.java:165)
	at org.apache.iceberg.BaseMetastoreTableOperations.writeNewMetadataIfRequired(BaseMetastoreTableOperations.java:160)
	at org.apache.iceberg.nessie.NessieTableOperations.doCommit(NessieTableOperations.java:115)
	at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:135)
	at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.create(BaseMetastoreCatalog.java:201)
	at org.apache.iceberg.CachingCatalog$CachingTableBuilder.lambda$create$0(CachingCatalog.java:261)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
	at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1916)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
	at org.apache.iceberg.CachingCatalog$CachingTableBuilder.create(CachingCatalog.java:257)
	at org.apache.iceberg.spark.SparkCatalog.createTable(SparkCatalog.java:247)
	at org.apache.spark.sql.connector.catalog.TableCatalog.createTable(TableCatalog.java:223)
	at org.apache.spark.sql.execution.datasources.v2.CreateTableExec.run(CreateTableExec.scala:44)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)


In [None]:
execute_query("""
MERGE INTO nessie.payment.customer AS target
USING latest_data AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN
  UPDATE SET
    target.x_location = source.x_location,
    target.y_location = source.y_location,
    target.row_updated_timestamp = source.row_updated_timestamp
WHEN NOT MATCHED THEN
  INSERT (
    customer_id,
    x_location,
    y_location,
    row_created_timestamp,
    row_updated_timestamp
  )
  VALUES (
    source.customer_id,
    source.x_location,
    source.y_location,
    source.row_created_timestamp,
    source.row_updated_timestamp
  )
""")

25/02/11 00:23:07 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
25/02/11 00:23:07 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
25/02/11 00:23:07 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
25/02/11 00:23:07 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
25/02/11 00:23:07 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.
25/02/11 00:23:08 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
25/02/11 00:23:08 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
25/02/11 00:23:08 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
25/02/11 00:23:08 WARN AdminClientConfig: The c

++
||
++
++



In [83]:
execute_query("""select * from payment.customer  """)

+-----------+----------+----------+--------------------------+--------------------------+
|customer_id|x_location|y_location|row_created_timestamp     |row_updated_timestamp     |
+-----------+----------+----------+--------------------------+--------------------------+
|0          |54.88135  |71.518936 |2025-02-11 00:23:07.265899|2025-02-11 00:23:07.265899|
|1          |60.276337 |54.48832  |2025-02-11 00:23:07.265899|2025-02-11 00:23:07.265899|
|2          |50.4567   |64.58941  |2025-02-11 00:23:07.265899|2025-02-11 00:23:07.265899|
|3          |64.58941  |43.75872  |2025-02-11 00:23:07.265899|2025-02-11 00:23:07.265899|
|4          |43.75872  |89.1773   |2025-02-11 00:23:07.265899|2025-02-11 00:23:07.265899|
|5          |89.1773   |96.36628  |2025-02-11 00:23:07.265899|2025-02-11 00:23:07.265899|
|6          |96.36628  |38.34415  |2025-02-11 00:23:07.265899|2025-02-11 00:23:07.265899|
|7          |38.34415  |79.1725   |2025-02-11 00:23:07.265899|2025-02-11 00:23:07.265899|
|8        

In [75]:
execute_query("""select * from payment.customers """)

+-----------+----------+----------+
|customer_id|x_location|y_location|
+-----------+----------+----------+
|0          |54.88135  |71.518937 |
|1          |60.276338 |54.488318 |
|2          |50.4567   |64.589411 |
|3          |64.589411 |43.758721 |
|4          |43.758721 |89.1773   |
|5          |89.1773   |96.366276 |
|6          |96.366276 |38.344152 |
|7          |38.344152 |79.172504 |
|8          |79.172504 |52.889492 |
|9          |52.889492 |56.804456 |
|10         |40.7128   |74.006    |
+-----------+----------+----------+



In [None]:
postgres -> debezium -> kafka (retention policy - 7 days ) -> spark streaming -> minio (s3)

In [None]:
kafka -> topic -> transform -> s3 table (create / replace )

incremental processing 

1. kafka -> event -> 

incremental read kafka basis offset -> spark streaming -> checkpointing (state store )

triggers -> freuently job execution





done this:
10 row -> updates + inserts + delete  upsert -> key column 

merge command (iceberg)


table - 100 rows 

8 am 

