# Kafka Demo

### Connect to Kafka Broker Server 
```
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
```
Find how to connect to Kafka server on Canvas lab 2 assignment page.

### To kill connection
```
lsof -ti:<local_port> | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

In [1]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer

topic = 'recitation-c' 

### Producer Mode -> Writes Data to Broker

In [2]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

producer = KafkaProducer(bootstrap_servers='localhost:9092',
                        value_serializer=lambda x: dumps(x).encode('utf-8'))

cities = ['Cologne','Paris','Panama City','Podgorica','Algiers','Rome','Yerevan']

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(18, 32)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

Writing to Kafka Broker
Writing: 2026-02-13 21:08:28,Rome,18ºC
Writing: 2026-02-13 21:08:29,Paris,19ºC
Writing: 2026-02-13 21:08:30,Yerevan,28ºC
Writing: 2026-02-13 21:08:31,Algiers,23ºC
Writing: 2026-02-13 21:08:32,Cologne,32ºC
Writing: 2026-02-13 21:08:33,Rome,30ºC
Writing: 2026-02-13 21:08:34,Paris,19ºC
Writing: 2026-02-13 21:08:35,Paris,22ºC
Writing: 2026-02-13 21:08:36,Podgorica,20ºC
Writing: 2026-02-13 21:08:37,Rome,20ºC


### Consumer Mode -> Reads Data from Broker

In [None]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

consumer = KafkaConsumer(
    topic,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest', #Experiment with different values
    enable_auto_commit=True,
    auto_commit_interval_ms=1000
)

print('Reading Kafka Broker')
for message in consumer:
    message = message.value.decode()
    # Default message.value type is bytes!
    print(loads(message))
    os.system(f"echo {message} >> kafka_log.csv")

Reading Kafka Broker
2026-02-13 20:38:04,Rome,24ºC
2026-02-13 20:38:05,Panama City,30ºC
2026-02-13 20:38:06,Panama City,21ºC
2026-02-13 20:38:07,Yerevan,25ºC
2026-02-13 20:38:08,Paris,21ºC
2026-02-13 20:38:09,Rome,22ºC
2026-02-13 20:38:10,Paris,18ºC
2026-02-13 20:38:11,Algiers,25ºC
2026-02-13 20:38:12,Panama City,18ºC
2026-02-13 20:38:13,Panama City,31ºC
2026-02-13 21:08:28,Rome,18ºC
2026-02-13 21:08:29,Paris,19ºC
2026-02-13 21:08:30,Yerevan,28ºC
2026-02-13 21:08:31,Algiers,23ºC
2026-02-13 21:08:32,Cologne,32ºC
2026-02-13 21:08:33,Rome,30ºC
2026-02-13 21:08:34,Paris,19ºC
2026-02-13 21:08:35,Paris,22ºC
2026-02-13 21:08:36,Podgorica,20ºC
2026-02-13 21:08:37,Rome,20ºC


# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [None]:
kcat -b localhost:9092 -t recitation-c -C -o beginning 
"2026-02-13 20:38:04,Rome,24\u00baC"
"2026-02-13 20:38:05,Panama City,30\u00baC"
"2026-02-13 20:38:06,Panama City,21\u00baC"
"2026-02-13 20:38:07,Yerevan,25\u00baC"
"2026-02-13 20:38:08,Paris,21\u00baC"
"2026-02-13 20:38:09,Rome,22\u00baC"
"2026-02-13 20:38:10,Paris,18\u00baC"
"2026-02-13 20:38:11,Algiers,25\u00baC"
"2026-02-13 20:38:12,Panama City,18\u00baC"
"2026-02-13 20:38:13,Panama City,31\u00baC"
% Reached end of topic recitation-c [0] at offset 10

kcat -b localhost:9092 -t recitation-c -C -o beginning -e | wc -l
% Reached end of topic recitation-c [0] at offset 10: exiting
      10

