# Programming with kafka

In [1]:
import warnings
warnings.filterwarnings("ignore")

import matplotlib.pyplot as plt
import pandas as pd

### 1. First, we install the Kafka Python Library

In [2]:
#!pip3 install kafka-python

### 2. Then, we set up the connection.

In [3]:
from kafka import KafkaProducer, KafkaConsumer
from IPython.display import clear_output
import json
import time
import random
from datetime import datetime, timezone

In [4]:
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

### 3. Now, we can test the database connection by accessing the previously created topic 'measurements'.

In [5]:
test_message = {
    "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
    "heart_rate": 0,
    "blood_oxygen": 0,
    "body_temperature": 0,
}
producer.send("measurements", test_message)
producer.flush()
print("Test message sent successfully to 'measurements' topic.")

Test message sent successfully to 'measurements' topic.


The test message was sent successfully, meaning the connection was established.

### 4. Then, we create 100 random measurements (1 per second) for health & fitness (for example from smart watch) to demonstrate data writing. For this, we use the random library.

In [6]:
def generate_health_measurement():
    return {
        "timestamp": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
        "heart_rate": random.randint(80, 85),         
        "blood_oxygen": round(random.uniform(98, 100), 1),
        "body_temperature": round(random.uniform(36, 37), 1),             
    }

for _ in range(100):
    measurement = generate_health_measurement()
    producer.send('measurements', measurement)
    print("Sent:", measurement)
    time.sleep(1)

producer.flush()
print("Finished sending measurements.")

Sent: {'timestamp': '2025-06-19T19:01:44Z', 'heart_rate': 83, 'blood_oxygen': 98.2, 'body_temperature': 36.7}
Sent: {'timestamp': '2025-06-19T19:01:45Z', 'heart_rate': 80, 'blood_oxygen': 99.6, 'body_temperature': 36.7}
Sent: {'timestamp': '2025-06-19T19:01:46Z', 'heart_rate': 81, 'blood_oxygen': 98.2, 'body_temperature': 36.1}
Sent: {'timestamp': '2025-06-19T19:01:47Z', 'heart_rate': 81, 'blood_oxygen': 99.9, 'body_temperature': 36.0}
Sent: {'timestamp': '2025-06-19T19:01:48Z', 'heart_rate': 82, 'blood_oxygen': 99.3, 'body_temperature': 36.1}
Sent: {'timestamp': '2025-06-19T19:01:49Z', 'heart_rate': 82, 'blood_oxygen': 98.9, 'body_temperature': 36.8}
Sent: {'timestamp': '2025-06-19T19:01:50Z', 'heart_rate': 82, 'blood_oxygen': 98.6, 'body_temperature': 36.0}
Sent: {'timestamp': '2025-06-19T19:01:52Z', 'heart_rate': 80, 'blood_oxygen': 98.5, 'body_temperature': 36.8}
Sent: {'timestamp': '2025-06-19T19:01:53Z', 'heart_rate': 83, 'blood_oxygen': 99.5, 'body_temperature': 36.3}
Sent: {'ti

### 5. Now to demonstrate reading of messages, we use KafkaConsumer.

Every cycle, the producer sends 10 measurements (one per second), then the consumer polls for new messages for 2 seconds, updates the table (pandas dataframe), and displays it. This repeats up to 10 times, or until no new data is received for 20 seconds. (Meaning we would add 100 data entries more)

In [7]:
consumer = KafkaConsumer(
    'measurements',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id=f'measurement-consumers-{int(time.time())}',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

In [8]:
data = []
last_new_data_time = time.time()

for _ in range(10):
    
    # Produce
    for _ in range(10):
        measurement = generate_health_measurement()
        producer.send('measurements', measurement)
        print("Sent:", measurement)
        time.sleep(1)

    # Consume (poll for 2 seconds)
    start_len = len(data)
    start_time = time.time()
    while time.time() - start_time < 2:
        msg_pack = consumer.poll(timeout_ms=500)
        for tp, messages in msg_pack.items():
            for message in messages:
                data.append(message.value)
    new_data = data[start_len:]

    # Display real time changing table
    clear_output(wait=True)
    if new_data:
        df = pd.DataFrame(data)
        print(df)
        last_new_data_time = time.time()
    else:
        print("No data collected yet.")
    print('----------------------------------------')    
    print(f"Updated table at {pd.Timestamp.now()}")

    # Stop if no new data after 20 seconds
    if time.time() - last_new_data_time > 20:
        print("No new data for 20 seconds. Stopping.")
        break

producer.flush()
print("Finished sending measurements.")

                timestamp  heart_rate  blood_oxygen  body_temperature
0    2025-06-19T19:01:44Z           0           0.0               0.0
1    2025-06-19T19:01:44Z          83          98.2              36.7
2    2025-06-19T19:01:45Z          80          99.6              36.7
3    2025-06-19T19:01:46Z          81          98.2              36.1
4    2025-06-19T19:01:47Z          81          99.9              36.0
..                    ...         ...           ...               ...
196  2025-06-19T19:05:18Z          80          98.9              36.7
197  2025-06-19T19:05:19Z          82          99.4              36.6
198  2025-06-19T19:05:20Z          80          98.4              36.2
199  2025-06-19T19:05:21Z          82          98.8              36.3
200  2025-06-19T19:05:22Z          81          98.6              36.5

[201 rows x 4 columns]
----------------------------------------
Updated table at 2025-06-19 21:05:25.886616
Finished sending measurements.


In [9]:
df

Unnamed: 0,timestamp,heart_rate,blood_oxygen,body_temperature
0,2025-06-19T19:01:44Z,0,0.0,0.0
1,2025-06-19T19:01:44Z,83,98.2,36.7
2,2025-06-19T19:01:45Z,80,99.6,36.7
3,2025-06-19T19:01:46Z,81,98.2,36.1
4,2025-06-19T19:01:47Z,81,99.9,36.0
...,...,...,...,...
196,2025-06-19T19:05:18Z,80,98.9,36.7
197,2025-06-19T19:05:19Z,82,99.4,36.6
198,2025-06-19T19:05:20Z,80,98.4,36.2
199,2025-06-19T19:05:21Z,82,98.8,36.3
