In [4]:
from confluent_kafka import Producer, Consumer


def read_config():
  # reads the client configuration from client.properties
  # and returns it as a key-value map
  config = {}
  with open("client.properties") as fh:
    for line in fh:
      line = line.strip()
      if len(line) != 0 and line[0] != "#":
        parameter, value = line.strip().split('=', 1)
        config[parameter] = value.strip()
  return config

def produce(topic, config):
  # creates a new producer instance
  producer = Producer(config)

  # produces a sample message
  key = "Name"
  value = "Parul"
  producer.produce(topic, key=key, value=value)
  print(f"Produced message to topic {topic}: key = {key:12} value = {value:12}")

  # send any outstanding or buffered messages to the Kafka broker
  producer.flush()

def consume(topic, config):
  # sets the consumer group ID and offset  
  config["group.id"] = "python-group-1"
  config["auto.offset.reset"] = "earliest"

  # creates a new consumer instance
  consumer = Consumer(config)

  # subscribes to the specified topic
  consumer.subscribe([topic])

  try:
    while True:
      # consumer polls the topic and prints any incoming messages
      msg = consumer.poll(1.0)
      if msg is not None and msg.error() is None:
        key = msg.key().decode("utf-8")
        value = msg.value().decode("utf-8")
        print(f"Consumed message from topic {topic}: key = {key:12} value = {value:12}")
  except KeyboardInterrupt:
    pass
  finally:
    # closes the consumer connection
    consumer.close()

In [5]:

config = read_config()
topic = "user-topic"

produce(topic, config)


%4|1737756134.040|CONFWARN|ccloud-python-client-a7265fc9-3f9c-4ec1-814a-53775c522002#producer-3| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance


Produced message to topic user-topic: key = Name         value = Parul       


%6|1737756134.380|GETSUBSCRIPTIONS|ccloud-python-client-a7265fc9-3f9c-4ec1-814a-53775c522002#producer-3| [thrd:main]: Telemetry client instance id changed from AAAAAAAAAAAAAAAAAAAAAA to YpWQLzmGRJCXUIaUnVGSvw


In [6]:
consume(topic, config)

%6|1737756144.428|GETSUBSCRIPTIONS|ccloud-python-client-a7265fc9-3f9c-4ec1-814a-53775c522002#consumer-4| [thrd:main]: Telemetry client instance id changed from AAAAAAAAAAAAAAAAAAAAAA to ApGnQIZBTQmHdpn4SifSmg


Consumed message from topic user-topic: key = 234          value = "KeyValue-234"
Consumed message from topic user-topic: key = 345          value = "KeyValue-345"
Consumed message from topic user-topic: key = 456          value = "KeyValue-456"
Consumed message from topic user-topic: key = 789          value = "KeyValue-789"
Consumed message from topic user-topic: key = 890          value = "KeyValue-890"
Consumed message from topic user-topic: key = Name         value = Parul       
Consumed message from topic user-topic: key = Name         value = Parul       
Consumed message from topic user-topic: key = Name         value = Parul       
Consumed message from topic user-topic: key = 1234213      value = "KeyValue"  
Consumed message from topic user-topic: key = 678          value = "KeyValue-678"
Consumed message from topic user-topic: key = 901          value = "KeyValue-901"
