In [1]:
topic_name = "all_datatypes_protobuf"
schema_file_path = "./schemas/protobuf_all_datatypes.proto"

In [2]:
import json


class Config:
    def __init__(self):
        config = json.loads(open("../config.json").read())
        self.kc_endpoint = config["kc_endpoint"]
        self.kc_api_key = config["kc_api_key"]
        self.kc_api_secret = config["kc_api_secret"]
        self.sr_endpoint = config["sr_endpoint"]
        self.sr_api_key = config["sr_api_key"]
        self.sr_api_secret = config["sr_api_secret"]


c = Config()

In [3]:
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
from confluent_kafka.serialization import MessageField, SerializationContext

import datagen
from schemas import protobuf_all_datatypes_pb2

# serializer setup
sr_client = SchemaRegistryClient(
    {
        "url": c.sr_endpoint,
        "basic.auth.user.info": f"{c.sr_api_key}:{c.sr_api_secret}",
    }
)
serializer = ProtobufSerializer(
    msg_type=protobuf_all_datatypes_pb2.SampleMessage,
    schema_registry_client=sr_client,
    conf={
        "use.deprecated.format": True,
    },
    # to_dict=lambda x, ctx: x,  # I still don't know why this is necessary.
)

conf = {
    "bootstrap.servers": c.kc_endpoint,
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": c.kc_api_key,
    "sasl.password": c.kc_api_secret,
}
producer = Producer(conf)


def produce_messages(num_messages: int = 5):
    messages_sent = []

    for _ in range(num_messages):
        key = datagen.generate_key()
        value = datagen.generate_protobuf_all_datatypes()
        headers = datagen.generate_headers()
        serialized_value = serializer(
            value, SerializationContext(topic_name, MessageField.VALUE)
        )
        producer.produce(
            topic_name,
            key=key,
            value=serialized_value,
            headers=headers,
            on_delivery=lambda err, msg: print(f"{err=}") if err else print(f"{msg=}"),
        )
        messages_sent.append(
            {
                "key": key,
                "value": value,
                "headers": headers,
                "topic": topic_name,
                "serialized_value": serialized_value,
            }
        )
    producer.flush()

    return messages_sent


messages = produce_messages(5)



msg=<cimpl.Message object at 0x116b677c0>
msg=<cimpl.Message object at 0x116b677c0>
msg=<cimpl.Message object at 0x116b677c0>
msg=<cimpl.Message object at 0x116b677c0>
msg=<cimpl.Message object at 0x116b677c0>


In [5]:
# need a better way of getting protobuf message size
len(str(messages[0])), messages[0]

(1403,
 {'key': '2e64bad0-7f7b-4ceb-9a47-c60bc29b45ba',
  'value': boolean_field: true
  int32_field: 278882313
  int64_field: 7305319665803605186
  uint32_field: 163158795
  uint64_field: 6313998473384579906
  sint32_field: 1568451607
  sint64_field: 2911466140558377397
  fixed32_field: 606719128
  fixed64_field: 5862605961704079185
  sfixed32_field: -1358856642
  sfixed64_field: -2564473171884737350
  float_field: 3.72258e+06
  double_field: 55
  bytes_field: "\261i}];^\220\324\211j"
  string_field: "Skill institution information realize kind attorney skill."
  enum_field: ENUM_TYPE3
  nested_message_field {
    nested_int32_field: 4161
  }
  repeated_boolean_field: true
  repeated_boolean_field: true
  repeated_boolean_field: true
  map_field {
    key: "key1"
    value: 6466
  }
  oneof_int32_field: 6837,
  'headers': {'foo': 'magazine',
   'bar': '8533',
   'baz': '358120',
   'schema_id': '100001',
   'schema_version': '1'},
  'topic': 'all_datatypes_protobuf',
  'serialized_valu