In [1]:
!pip install confluent-kafka==1.7.0



In [2]:
from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions
from confluent_kafka import KafkaException
import sys
from uuid import uuid4

In [3]:
bootstrap_server = "kafka:9092" # Brokers act as cluster entripoints

In [4]:
conf = {'bootstrap.servers': bootstrap_server}

In [5]:
a = AdminClient(conf)

In [6]:
md = a.list_topics(timeout=10)
print(" {} topics:".format(len(md.topics)))
for t in iter(md.topics.values()):
    if t.error is not None:
        errstr = ": {}".format(t.error)
    else:
        errstr = ""
    print("  \"{}\" with {} partition(s){}".format(t, len(t.partitions), errstr))

 4 topics:
  "SmokeSensorEvent" with 1 partition(s)
  "TemperatureSensorEvent" with 1 partition(s)
  "_schemas" with 1 partition(s)
  "__consumer_offsets" with 50 partition(s)


In [7]:
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import *
import time

topic = "TemperatureSensorEvent"

def delivery_report(err, msg):
    if err is not None:
        print("Failed to deliver message: {}".format(err))
    else:
        print("Produced record to topic {} partition [{}] @ offset {}"
              .format(msg.topic(), msg.partition(), msg.offset()))

In [8]:
producer_conf = {
        'bootstrap.servers': bootstrap_server,
        'key.serializer': StringSerializer('utf_8'),
        'value.serializer': StringSerializer('utf_8')
}

producer = SerializingProducer(producer_conf)

## run the following cell to send temperature around 20°C

In [9]:
import json
from random import gauss
from IPython.display import clear_output

while True:
    key = "S1"
    value = {"sensor": "S1","temperature": gauss(20, 1.0),"ts":int(time.time())}
    producer.produce(topic=topic, value=json.dumps(value), key=key, on_delivery=delivery_report)
    print(value)
    producer.poll(1)
    time.sleep(10)
    clear_output(wait=True)


{'sensor': 'S1', 'temperature': 19.822324525257955, 'ts': 1729315226}
Produced record to topic TemperatureSensorEvent partition [0] @ offset 3


KeyboardInterrupt: 

to interrupt the execution of the cell, prese the square icon in the bar or choose *interrupt kernel* from the *kernel* dropdown menu

## run the following cell to send temperature around 55°C

In [10]:
while True:
    key = "S1"
    value = {"sensor": "S1","temperature": gauss(55, 1.0),"ts":int(time.time())}
    producer.produce(topic=topic, value=json.dumps(value), key=key, on_delivery=delivery_report)
    print(value)
    producer.poll(1)
    time.sleep(10)    
    clear_output(wait=True)

{'sensor': 'S1', 'temperature': 55.40752226037262, 'ts': 1729316087}
Produced record to topic TemperatureSensorEvent partition [0] @ offset 89


KeyboardInterrupt: 