In [4]:
from confluent_kafka import Producer
import json

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

# Configure the producer
p = Producer({'bootstrap.servers': 'localhost:9092'})

topic = 'quickstart-events'

for i in range(10):
    data = {'number': i}
    # Trigger any available delivery report callbacks from previous produce() calls
    p.poll(0)

    # Asynchronously produce a message, the delivery report callback will be triggered from poll() above, or flush() below, when the message has been successfully delivered or failed permanently.
    p.produce(topic, json.dumps(data).encode('utf-8'), callback=delivery_report)

# Wait for any outstanding messages to be delivered and delivery report callbacks to be triggered.
p.flush()


Message delivered to my-topic [0]
Message delivered to my-topic [0]
Message delivered to my-topic [0]
Message delivered to my-topic [0]
Message delivered to my-topic [0]
Message delivered to my-topic [0]
Message delivered to my-topic [0]
Message delivered to my-topic [0]
Message delivered to my-topic [0]
Message delivered to my-topic [0]


0

In [None]:
from confluent_kafka import Consumer, KafkaError

# Configure the consumer
c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['quickstart-events'])

try:
    i = 0
    while i < 10:
        msg = c.poll(timeout=1.0)  # Adjust poll timeout as needed
        i += 1
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                print(f'{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}')
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            # Message is a normal message
            print(f'Message: {msg.value().decode("utf-8")}')
finally:
    # Clean up on exit
    c.close()
