In [1]:
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
from google.protobuf.json_format import ParseDict
from google.protobuf.descriptor_pool import DescriptorPool
import my_event_pb2
import time
import random

In [2]:
# Set up the Schema Registry client
schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Set up the Protobuf serializer with the required configuration
serializer_conf = {'use.deprecated.format': False}
protobuf_serializer = ProtobufSerializer(my_event_pb2.MyEvent, schema_registry_client, serializer_conf)

In [3]:
def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

producer_conf = {
    'bootstrap.servers': 'kafka-1:29092,kafka-2:29093,kafka-3:29094',
}

producer = Producer(producer_conf)

In [4]:
while True:
    event = my_event_pb2.MyEvent()
    event.timestamp.GetCurrentTime()
    event.value = random.randint(1, 100)

    serialized_event = protobuf_serializer(event, SerializationContext('my-topic', MessageField.VALUE))

    producer.produce(
        'my-topic',
        key=str(event.timestamp.seconds),
        value=serialized_event,
        on_delivery=delivery_report
    )
    producer.flush()

    print(f"Produced value: {event.value}")
    print(f"Timestamp: {event.timestamp}")
    time.sleep(5)

Message delivered to my-topic [2]
Produced value: 51
Timestamp: seconds: 1725445645
nanos: 316601000

Message delivered to my-topic [0]
Produced value: 72
Timestamp: seconds: 1725445651
nanos: 63830000

Message delivered to my-topic [5]
Produced value: 9
Timestamp: seconds: 1725445656
nanos: 75312000

Message delivered to my-topic [5]
Produced value: 32
Timestamp: seconds: 1725445661
nanos: 86939000

Message delivered to my-topic [2]
Produced value: 40
Timestamp: seconds: 1725445666
nanos: 88861000

Message delivered to my-topic [2]
Produced value: 93
Timestamp: seconds: 1725445671
nanos: 89761000

Message delivered to my-topic [1]
Produced value: 22
Timestamp: seconds: 1725445676
nanos: 91568000

Message delivered to my-topic [3]
Produced value: 87
Timestamp: seconds: 1725445681
nanos: 104474000

Message delivered to my-topic [4]
Produced value: 47
Timestamp: seconds: 1725445686
nanos: 117672000

Message delivered to my-topic [0]
Produced value: 56
Timestamp: seconds: 1725445691
nanos

KeyboardInterrupt: 