In [4]:
from dotenv import load_dotenv
import os
load_dotenv()

SCHEMA_REGISTRY = os.getenv("SCHEMA_REGISTRY")
BOOTSTRAP_SERVER = os.getenv("BOOTSTRAP_SERVER")
USERNAME = os.getenv("USERNAME")
PASSWORD = os.getenv("PASSWORD")


In [5]:
%%markdown

Consumer with schema registry


Consumer with schema registry


In [6]:
from confluent_kafka import Consumer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField

# Setup AvroDeserializer with the Schema Registry Client
avro_deserializer = AvroDeserializer(SchemaRegistryClient({
    'url': SCHEMA_REGISTRY,
    'basic.auth.user.info': USERNAME + ":" + PASSWORD
}))

# Consumer configuration
consumer = Consumer({
    'bootstrap.servers': BOOTSTRAP_SERVER,
    'sasl.mechanism': 'SCRAM-SHA-256',
    'security.protocol': 'SASL_SSL',
    'sasl.username': USERNAME,
    'sasl.password': PASSWORD,
    'group.id': 'default-consumer-group',
    'auto.offset.reset': 'earliest'
})

# Subscribe to the topic
consumer.subscribe(["default-topic"])

# Poll messages from Kafka, deserialize, and handle potential errors
try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                print(msg.error())
                break

        try:
            deserialized = avro_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))
            if deserialized is not None:
                print(f"Key: {msg.key()}, Value: {deserialized}")
        except Exception as e:
            print(f"Deserialization error for message {msg}: {e}")
finally:
    consumer.close()


Key: b'0bcfe6e0-cffe-4f02-8e98-6ca372aee9ba', Value: {'name': 'user', 'favorite_number': 73}
Key: b'c605f3db-8358-464e-a37f-a1b59e320659', Value: {'name': 'user', 'favorite_number': 43}


KeyboardInterrupt: 

Consumer

In [None]:
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'default-topic',
    bootstrap_servers=BOOTSTRAP_SERVER,
    sasl_mechanism='SCRAM-SHA-256',
    security_protocol='SASL_SSL',
    sasl_plain_username=USERNAME,
    sasl_plain_password=PASSWORD,
    group_id='default-consumer-group',
    auto_offset_reset='earliest'
)

try:
    for message in consumer:
        print(f"Received message: {message.value}")
except KeyboardInterrupt:
    pass
finally:
    consumer.close()