# Kafka consume

In [6]:
from confluent_kafka import Consumer, KafkaException
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
from typing import List, Dict

KAFKA_CONFIG = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "notebook-consumer-group",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
}

SCHEMA_CONFIG = {"url": "http://localhost:8081"}
schema_registry_client = SchemaRegistryClient(SCHEMA_CONFIG)
avro_deserializer = AvroDeserializer(schema_registry_client)


def consume_kafka_messages(topic: str, max_messages: int) -> List[Dict]:
    consumer = Consumer(KAFKA_CONFIG)
    consumer.subscribe([topic])

    messages = []
    print(f"Subscribed to topic: {topic}")
    print(f"Waiting for up to {max_messages} Avro-encoded messages...")

    try:
        while len(messages) < max_messages:
            msg = consumer.poll(timeout=2.0)

            if msg is None:
                print("No message received.")
                continue
            if msg.error():
                raise KafkaException(msg.error())

            value = avro_deserializer(
                msg.value(), SerializationContext(topic, MessageField.VALUE)
            )

            if value is not None:
                messages.append(value)
                print(f"Received message {len(messages)} of {max_messages}")
                consumer.commit(msg)

    finally:
        consumer.close()

    return messages


messages = consume_kafka_messages("storefront.public.orders", 5)
messages

Subscribed to topic: storefront.public.orders
Waiting for up to 5 Avro-encoded messages...
No message received.
Received message 1 of 5
Received message 2 of 5
Received message 3 of 5
Received message 4 of 5
Received message 5 of 5


[{'before': None,
  'after': {'id': 1,
   'customer_id': 473,
   'status': 'cancelled',
   'total': Decimal('1124.84'),
   'created_at': 1743859191675403},
  'source': {'version': '2.5.4.Final',
   'connector': 'postgresql',
   'name': 'storefront',
   'ts_ms': 1743859335320,
   'snapshot': 'first_in_data_collection',
   'db': 'storefront',
   'sequence': '[null,"40034744"]',
   'schema': 'public',
   'table': 'orders',
   'txId': 776,
   'lsn': 40034744,
   'xmin': None},
  'op': 'r',
  'ts_ms': 1743859344573,
  'transaction': None},
 {'before': None,
  'after': {'id': 2,
   'customer_id': 152,
   'status': 'delivered',
   'total': Decimal('1997.27'),
   'created_at': 1743859191695691},
  'source': {'version': '2.5.4.Final',
   'connector': 'postgresql',
   'name': 'storefront',
   'ts_ms': 1743859335320,
   'snapshot': 'true',
   'db': 'storefront',
   'sequence': '[null,"40034744"]',
   'schema': 'public',
   'table': 'orders',
   'txId': 776,
   'lsn': 40034744,
   'xmin': None},
 

In [25]:
messages[0]["after"]

{'id': 1,
 'customer_id': 473,
 'status': 'cancelled',
 'total': Decimal('1124.84'),
 'created_at': 1743859191675403}

# Read S3

In [1]:
# setup

import os
import boto3
import fastavro
from smart_open import open as smart_open
from dotenv import load_dotenv
from typing import List, Dict

load_dotenv("../.streaming.env")
load_dotenv("../.lake.env")

MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT")
MINIO_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID")
MINIO_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")

s3 = boto3.client(
    "s3",
    endpoint_url=MINIO_ENDPOINT,
    aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY,
    region_name="us-east-1",
)

In [None]:
def read_sample_avro_records(
    bucket: str, topic_prefix: str, max_records: int = 5
) -> List[Dict]:
    response = s3.list_objects_v2(Bucket=bucket, Prefix=topic_prefix)
    files = [
        obj["Key"]
        for obj in response.get("Contents", [])
        if obj["Key"].endswith(".avro")
    ]

    print(
        f"Found {len(files)} Avro files in bucket '{bucket}' with prefix '{topic_prefix}'; top 5:"
    )
    for f in files[:5]:
        print(" -", f)

    if not files:
        return []

    records = []
    with smart_open(
        f"s3://{bucket}/{files[0]}", "rb", transport_params={"client": s3}
    ) as f:
        reader = fastavro.reader(f)
        for i, record in enumerate(reader):
            records.append(record)
            if i + 1 >= max_records:
                break

    return records


records = read_sample_avro_records(
    "raw", "kafka/storefront.public.orders", max_records=10
)
records

Found 10 Avro files in bucket 'raw' with prefix 'kafka/storefront.public.orders'; top 5:
 - kafka/storefront.public.orders/partition=0/storefront.public.orders+0+0000000000.avro
 - kafka/storefront.public.orders/partition=0/storefront.public.orders+0+0000001000.avro
 - kafka/storefront.public.orders/partition=0/storefront.public.orders+0+0000002000.avro
 - kafka/storefront.public.orders/partition=0/storefront.public.orders+0+0000003000.avro
 - kafka/storefront.public.orders/partition=0/storefront.public.orders+0+0000004000.avro


[{'before': None,
  'after': {'id': 1,
   'customer_id': 473,
   'status': 'cancelled',
   'total': Decimal('1124.84'),
   'created_at': 1743859191675403},
  'source': {'version': '2.5.4.Final',
   'connector': 'postgresql',
   'name': 'storefront',
   'ts_ms': 1743859335320,
   'snapshot': 'first_in_data_collection',
   'db': 'storefront',
   'sequence': '[null,"40034744"]',
   'schema': 'public',
   'table': 'orders',
   'txId': 776,
   'lsn': 40034744,
   'xmin': None},
  'op': 'r',
  'ts_ms': 1743859344573,
  'transaction': None},
 {'before': None,
  'after': {'id': 2,
   'customer_id': 152,
   'status': 'delivered',
   'total': Decimal('1997.27'),
   'created_at': 1743859191695691},
  'source': {'version': '2.5.4.Final',
   'connector': 'postgresql',
   'name': 'storefront',
   'ts_ms': 1743859335320,
   'snapshot': 'true',
   'db': 'storefront',
   'sequence': '[null,"40034744"]',
   'schema': 'public',
   'table': 'orders',
   'txId': 776,
   'lsn': 40034744,
   'xmin': None},
 

In [7]:
# Check row count postgres == avro count s3

import pandas as pd
import psycopg
from dotenv import load_dotenv

load_dotenv("../.source.env", override=True)

conn = psycopg.connect(
    host="localhost",
    port="4444",
    dbname="storefront",
    user=os.getenv("POSTGRES_USER"),
    password=os.getenv("POSTGRES_PASSWORD"),
)

tables = ["customers", "order_items", "orders", "payments", "products"]
bucket = "raw"
prefix_template = "kafka/storefront.public.{}"

tables_actual_counts = {}
with conn.cursor() as cur:
    for table in tables:
        cur.execute(f"SELECT COUNT(*) FROM public.{table}")
        count = cur.fetchone()[0]
        tables_actual_counts[table] = count


def count_avro_records(bucket: str, prefix: str) -> int:
    response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
    files = [
        obj["Key"]
        for obj in response.get("Contents", [])
        if obj["Key"].endswith(".avro")
    ]

    total_records = 0
    for file_key in files:
        with smart_open(
            f"s3://{bucket}/{file_key}", "rb", transport_params={"client": s3}
        ) as f:
            reader = fastavro.reader(f)
            total_records += sum(1 for _ in reader)
    return total_records


record_counts = []
for table, expected in tables_actual_counts.items():
    prefix = prefix_template.format(table)
    actual = count_avro_records(bucket, prefix)
    record_counts.append(
        {
            "table": table,
            "postgres_count": expected,
            "s3_count": actual,
            "match": expected == actual,
        }
    )

df_counts = pd.DataFrame(record_counts)
df_counts

Unnamed: 0,table,postgres_count,s3_count,match
0,customers,5000,5000,True
1,order_items,24991,24991,True
2,orders,10000,10000,True
3,payments,10000,10000,True
4,products,10000,10000,True


## Create dbt like bronze inject yaml

In [None]:
import os
from decimal import Decimal
from typing import Any, Dict, List, Optional

import fastavro
import yaml
from smart_open import open as smart_open

BUCKET = "raw"
ROOT_PREFIX = "kafka/"
OUT_PATH = "output/bronze_tables.yaml"


def list_all_keys(bucket: str, prefix: str) -> List[str]:
    keys: List[str] = []
    token: Optional[str] = None
    while True:
        kwargs = {"Bucket": bucket, "Prefix": prefix, "MaxKeys": 1000}
        if token:
            kwargs["ContinuationToken"] = token
        resp = s3.list_objects_v2(**kwargs)
        for obj in resp.get("Contents", []):
            keys.append(obj["Key"])
        if not resp.get("IsTruncated"):
            break
        token = resp.get("NextContinuationToken")
    return keys


def topic_from_key(key: str) -> Optional[str]:
    if not key.startswith(ROOT_PREFIX):
        return None
    parts = key[len(ROOT_PREFIX) :].split("/", 1)
    return parts[0] if parts else None


def first_avro_key_for_topic(keys: List[str], topic: str) -> Optional[str]:
    for k in keys:
        if k.startswith(f"{ROOT_PREFIX}{topic}/") and k.endswith(".avro"):
            return k
    return None


def infer_iceberg_type(py_value: Any) -> str:
    if py_value is None:
        return "string"
    if isinstance(py_value, bool):
        return "boolean"
    if isinstance(py_value, int):
        return "bigint"
    if isinstance(py_value, float):
        return "double"
    if isinstance(py_value, Decimal):
        scale = -py_value.as_tuple().exponent
        precision = max(scale + len(py_value.as_tuple().digits), 1)
        return f"decimal({min(precision, 38)},{min(scale, 38)})"
    if isinstance(py_value, (bytes, bytearray)):
        return "binary"
    return "string"


def merge_types(t1: str, t2: str) -> str:
    def base(t: str) -> str:
        return "decimal" if t.startswith("decimal") else t

    order = [
        "binary",
        "decimal",
        "double",
        "bigint",
        "int",
        "boolean",
        "timestamp",
        "date",
        "string",
    ]
    return t1 if order.index(base(t1)) < order.index(base(t2)) else t2


def sample_after_fields(bucket: str, key: str, max_records: int = 50) -> Dict[str, str]:
    types: Dict[str, str] = {}
    url = f"s3://{bucket}/{key}"
    with smart_open(url, "rb", transport_params={"client": s3}) as f:
        reader = fastavro.reader(f)
        for i, rec in enumerate(reader):
            after = rec.get("after") or {}
            if not isinstance(after, dict):
                continue
            for field, value in after.items():
                inferred = infer_iceberg_type(value)
                types[field] = merge_types(types.get(field, inferred), inferred)
            if i + 1 >= max_records:
                break
    return types


def build_yaml_for_all_topics() -> Dict[str, Any]:
    keys = list_all_keys(BUCKET, ROOT_PREFIX)
    topics = sorted({t for k in keys if (t := topic_from_key(k))})
    out = {
        "version": 1,
        "sources": [
            {
                "name": "storefront_cdc",
                "description": "Debezium CDC topics landed by Kafka Connect in raw/kafka/",
                "tables": [],
            }
        ],
    }
    tables = out["sources"][0]["tables"]

    for topic in topics:
        avro_key = first_avro_key_for_topic(keys, topic)
        if not avro_key:
            continue
        types = sample_after_fields(BUCKET, avro_key, max_records=50)
        if not types:
            continue

        schema_rows = [
            {
                "name": "created_at_ms" if name == "created_at" else name,
                "type": t,
                "source": f"after.{name}",
            }
            for name, t in sorted(types.items())
        ]

        table_entry = {
            "name": topic.split(".")[-1],
            "description": f"{topic} CDC stream",
            "source_path": f"s3a://{BUCKET}/{ROOT_PREFIX}{topic}/",
            "format": "avro",
            "filter_op": ["c", "u", "r"],
            "schema": schema_rows,
            "partitions": ["event_date"],
            "table_properties": {
                "format-version": "2",
                "write.distribution-mode": "hash",
            },
        }
        tables.append(table_entry)

    return out


doc = build_yaml_for_all_topics()
os.makedirs(os.path.dirname(OUT_PATH), exist_ok=True)
with open(OUT_PATH, "w") as f:
    yaml.safe_dump(doc, f, sort_keys=False)
print(f"Saved config to {OUT_PATH}")

Saved config to output/bronze_tables.yaml


## Check parquet

In [None]:
from typing import Dict
import polars as pl

BUCKET_STAGE = "lakehouse"
PREFIX_STAGE = "stage/"

storage_options: Dict[str, str] = {
    "aws_access_key_id": MINIO_ACCESS_KEY,
    "aws_secret_access_key": MINIO_SECRET_KEY,
    "endpoint_url": MINIO_ENDPOINT,
}


def scan_table(prefix: str) -> pl.LazyFrame:
    return pl.scan_parquet(
        f"{prefix}**/*.parquet",
        storage_options=storage_options,
        low_memory=True,
    )


tables = ["customers", "order_items", "orders", "payments", "products"]

for t in tables:
    base = f"s3://{BUCKET_STAGE}/{PREFIX_STAGE}{t}/"
    lf = scan_table(base)

    print(f"\n=== {t.upper()} ===")
    print(lf.schema)
    print(lf.head(5).collect())

    row_count = lf.select(pl.len()).collect().item()
    file_count = (
        pl.scan_parquet(f"{base}**/*.parquet", storage_options=storage_options)
        .select(pl.len())
        .collect()
        .item()
    )
    print(f"files={file_count:,}  rows≈{row_count:,}")


=== CUSTOMERS ===


  print(lf.schema)


Schema({'address': String, 'city': String, 'country': String, 'created_at': Int64, 'email': String, 'full_name': String, 'id': Int64, 'event_date': Date, 'event_date_source': String})
shape: (5, 9)
┌────────────┬────────────┬────────────┬────────────┬───┬───────────┬──────┬───────────┬───────────┐
│ address    ┆ city       ┆ country    ┆ created_at ┆ … ┆ full_name ┆ id   ┆ event_dat ┆ event_dat │
│ ---        ┆ ---        ┆ ---        ┆ ---        ┆   ┆ ---       ┆ ---  ┆ e         ┆ e_source  │
│ str        ┆ str        ┆ str        ┆ i64        ┆   ┆ str       ┆ i64  ┆ ---       ┆ ---       │
│            ┆            ┆            ┆            ┆   ┆           ┆      ┆ date      ┆ str       │
╞════════════╪════════════╪════════════╪════════════╪═══╪═══════════╪══════╪═══════════╪═══════════╡
│ 391 Case   ┆ Moranton   ┆ Isle of    ┆ 1735697918 ┆ … ┆ Aimee W.  ┆ 3034 ┆ 2025-01-0 ┆ created_a │
│ Cliff Apt. ┆            ┆ Man        ┆ 576590     ┆   ┆           ┆      ┆ 1         ┆ t     