In [145]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import col, expr, to_timestamp, to_date, current_timestamp, row_number
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

KAFKA_BROKER = "redpanda.kafka.svc:9092"
SCHEMA_REGISTRY_URL = "http://redpanda.kafka.svc:8081"

# Kafka Topics (compact topic for state)
STATE_TOPIC = "blockchain.ingestion._state"
TABLE_NAME = "bronze.kafka_ingestion_state"
SUBJECT = f"{STATE_TOPIC}-value"

avro_schema = requests.get(
    f"{SCHEMA_REGISTRY_URL}/subjects/{SUBJECT}/versions/latest"
).json()["schema"]


spark.conf.set("spark.sql.iceberg.write.distribution-mode", "hash")
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1_000_000)

In [148]:
df = (
    spark.read
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BROKER)
    .option("subscribe", STATE_TOPIC)
    .option("startingOffsets", "earliest")
    .option("endingOffsets", "latest")
    .load()
)

df_stripped = df.withColumn(
    "value_no_header",
    expr("substring(value, 6, length(value)-5)")
)

df_parsed = (
    df_stripped
    .select(
        # ===== Avro payload =====
        from_avro(
            col("value_no_header"),
            avro_schema,
            {"mode": "PERMISSIVE"}
        ).alias("r"),
        
        # ===== Kafka key =====
        col("key").cast("string").alias("kafka_key"),

        # ===== Kafka metadata =====
        col("topic").alias("kafka_topic"),
        col("partition").alias("kafka_partition"),
        col("offset").alias("kafka_offset"),
        col("timestamp").alias("kafka_timestamp")
    )
    .select(
        "r.*",
        "kafka_key",
        "kafka_topic",
        "kafka_partition",
        "kafka_offset",
        "kafka_timestamp"
    )
)

# convert string to timestamp
df_parsed_ts = (
    df_parsed
    .withColumn("kafka_date", to_date(col("kafka_timestamp")))
)

df_ordered = (
    df_parsed_ts
    .select(
        # ===== state core =====
        col("checkpoint").alias("checkpoint_block"),

        col("producer.pod_name").alias("producer_pod_name"),
        col("producer.pod_uid").alias("producer_pod_uid"),

        col("run.run_id").alias("run_id"),
        col("run.mode").alias("run_mode"),
        col("run.start_block").alias("run_start_block"),
        to_timestamp(col("run.started_at")).alias("run_started_at"),
        
        # ===== Kafka metadata =====
        "kafka_key",
        "kafka_topic",
        "kafka_partition",
        "kafka_offset",
        "kafka_timestamp",
        "kafka_date"
    )
    .withColumn("state_updated_at", current_timestamp())
)


window_latest = Window.partitionBy("kafka_key").orderBy(
    col("kafka_offset").desc()
)

df_latest = (
    df_ordered
    .withColumn("rn", row_number().over(window_latest))
    .filter(col("rn") == 1)
    .drop("rn")
)

df_latest.createOrReplaceTempView("state_updates")

spark.sql("""
MERGE INTO bronze.kafka_ingestion_state t
USING state_updates s
ON t.run_id = s.run_id

WHEN MATCHED THEN UPDATE SET
  t.checkpoint_block       = s.checkpoint_block,
  t.producer_pod_name      = s.producer_pod_name,
  t.producer_pod_uid       = s.producer_pod_uid,
  t.run_mode               = s.run_mode,
  t.run_start_block        = s.run_start_block,
  t.run_started_at         = s.run_started_at,
  t.kafka_key              = s.kafka_key,
  t.kafka_topic            = s.kafka_topic,
  t.kafka_partition        = s.kafka_partition,
  t.kafka_offset           = s.kafka_offset,
  t.kafka_timestamp        = s.kafka_timestamp,
  t.kafka_date             = s.kafka_date,
  t.state_updated_at       = s.state_updated_at

WHEN NOT MATCHED THEN INSERT *

""")

spark.sql("""
    with kafka_key_cte as (
        select 
            split(kafka_key, ':') AS parts,
            kafka_key,
            run_start_block as run_start,
            checkpoint_block as run_end,
            producer_pod_name,
            run_id,
            run_mode,
            kafka_timestamp
        from bronze.kafka_ingestion_state
    )
    select 
        parts[0] AS chain,
        parts[1] AS ingestion_entity,
        parts[2] AS ingestion_type,
        run_mode,
        run_start,
        run_end,
        producer_pod_name,
        run_id,
        kafka_timestamp,
        kafka_key
        from (
            select *, 
            row_number() over (partition by kafka_key order by run_end desc, kafka_timestamp desc, run_id desc) as rn
        from kafka_key_cte
        ) order by kafka_key, rn
""").createOrReplaceTempView("v_latest_ingestion_state")

In [None]:
# !pip install pandas

Collecting pandas
  Using cached pandas-3.0.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (79 kB)
Collecting numpy>=1.26.0 (from pandas)
  Using cached numpy-2.4.2-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (6.6 kB)
Using cached pandas-3.0.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (11.2 MB)
Using cached numpy-2.4.2-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (16.9 MB)
Installing collected packages: numpy, pandas
Successfully installed numpy-2.4.2 pandas-3.0.0


In [149]:
spark.sql("select * from v_latest_ingestion_state").toPandas()

Unnamed: 0,chain,ingestion_entity,ingestion_type,run_mode,run_start,run_end,producer_pod_name,run_id,kafka_timestamp,kafka_key
0,bsc,blocks,realtime,chain_head_resume,79438840,79439545,bsc-blocks-ingestion-5db5bfc7d9-mzr7l,b61501db-9303-4ad9-9366-f8f14cbb3d47,2026-02-05 12:09:20.740,bsc:blocks:realtime
1,bsc,blocks,realtime,chain_head_resume,79430917,79438372,bsc-blocks-ingestion-5db5bfc7d9-mzr7l,8a2ee0ff-b49b-409e-8d2a-19810db3c51f,2026-02-05 12:00:53.506,bsc:blocks:realtime
2,bsc,blocks,realtime,chain_head_resume,79412001,79416288,bsc-blocks-ingestion-5db5bfc7d9-mzr7l,234574e2-7196-4ce8-9869-34b352bc5028,2026-02-05 09:14:48.655,bsc:blocks:realtime
3,bsc,blocks,realtime,chain_head_resume,79406931,79408433,bsc-blocks-ingestion-6669546d46-dcx6g,385ae973-c93c-4c39-8e82-21dd322a9a56,2026-02-05 08:15:52.709,bsc:blocks:realtime
4,bsc,logs,realtime,chain_head_resume,79439374,79439546,bsc-logs-ingestion-6c6df4f66-jdxf9,2132ae46-d5ee-4102-bcb7-a7dbf6e40b35,2026-02-05 12:09:19.691,bsc:logs:realtime
5,bsc,logs,realtime,chain_head_resume,79412002,79423691,bsc-logs-ingestion-69799c448-bqg4p,9c301d60-0703-4957-92b1-770bc8f055a3,2026-02-05 10:11:34.530,bsc:logs:realtime
6,bsc,logs,realtime,chain_head_resume,79405757,79405865,bsc-logs-ingestion-7f9b495d5f-fg499,795c9713-a01a-46ae-aea4-0dfa3870d52f,2026-02-05 07:56:39.632,bsc:logs:realtime


In [126]:
# lazy execution
# df_latest.show()

In [127]:
spark.sql("""
    with kafka_key_cte as (
        select 
            split(kafka_key, ':') AS parts,
            kafka_key,
            run_start_block as run_start,
            checkpoint_block as run_end,
            producer_pod_name,
            run_id,
            run_mode,
            kafka_timestamp
        from bronze.kafka_ingestion_state
    )
    select 
        parts[0] AS chain,
        parts[1] AS ingestion_entity,
        parts[2] AS ingestion_type,
        run_mode,
        run_start,
        run_end,
        producer_pod_name,
        run_id,
        kafka_timestamp,
        kafka_key
        from (
            select *, 
            row_number() over (partition by kafka_key order by run_end desc, kafka_timestamp desc, run_id desc) as rn
        from kafka_key_cte
        ) order by kafka_key, rn
""").show(truncate=False)

+-----+----------------+--------------+-----------------+---------+--------+-------------------------------------+------------------------------------+-----------------------+-------------------+
|chain|ingestion_entity|ingestion_type|run_mode         |run_start|run_end |producer_pod_name                    |run_id                              |kafka_timestamp        |kafka_key          |
+-----+----------------+--------------+-----------------+---------+--------+-------------------------------------+------------------------------------+-----------------------+-------------------+
|bsc  |blocks          |realtime      |chain_head_resume|79412001 |79415700|bsc-blocks-ingestion-5db5bfc7d9-mzr7l|234574e2-7196-4ce8-9869-34b352bc5028|2026-02-05 09:10:24.042|bsc:blocks:realtime|
|bsc  |blocks          |realtime      |chain_head_resume|79406931 |79408433|bsc-blocks-ingestion-6669546d46-dcx6g|385ae973-c93c-4c39-8e82-21dd322a9a56|2026-02-05 08:15:52.709|bsc:blocks:realtime|
|bsc  |logs         

In [91]:
spark.sql("""
          delete from bronze.kafka_ingestion_state where kafka_key like "%.%"
          """)

DataFrame[]

In [144]:
spark.conf.get("spark.sql.catalog.spark_catalog.type")

'hadoop'