In [18]:
from kafka import KafkaConsumer
import json
import io
import avro


def fetch_schema():
    from confluent_kafka.schema_registry import SchemaRegistryClient

    sr = SchemaRegistryClient({"url": "http://localhost:8081"})
    subjects = sr.get_subjects()
    schemas = []
    for subject in subjects:
        schema = sr.get_latest_version(subject)
        # print(schema.version)
        # print(schema.schema_id)
        # print(schema.schema.schema_str)
        schema_json = json.loads(schema.schema.schema_str)

        if schema_json["name"] == "Envelope":
            schemas.append(schema.schema.schema_str)

    return schemas


def decode_method(string: bytes, schema: str):
    avro_schema = avro.schema.parse(schema)
    reader = avro.io.DatumReader(avro_schema)

    message_bytes = io.BytesIO(string)
    message_bytes.seek(5)
    decoder = avro.io.BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict


consumer = KafkaConsumer(
    bootstrap_servers=["localhost:29092"],
    auto_offset_reset="earliest",
    enable_auto_commit=False,
    # value_deserializer=lambda x: decode_method(x),
)

consumer.subscribe(
    ["postgres.sales.users", "postgres.sales.products", "postgres.sales.orders"]
)

In [19]:
schemas = fetch_schema()

In [20]:
for message in consumer:
    # find schema
    for s in schemas:
        if json.loads(s)["namespace"] == message.topic:
            schema = s
            break

    decoded_value = decode_method(message.value, schema)
    print(f"topic_name: {message.topic}, value: {decoded_value}")


topic_name: postgres.sales.users, value: b'{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"created_at"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"updated_at"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"deleted_at"}],"optional":true,"name":"postgres.sales.users.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestam

KeyboardInterrupt: 