# Kafka Demo

### Connect to Kafka Broker Server 
```
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
```
Find how to connect to Kafka server on Canvas lab 2 assignment page.

### To kill connection
```
lsof -ti:<local_port> | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

In [1]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer

from dotenv import load_dotenv
load_dotenv()

# Update this for your own recitation section :)
topic = 'mlip-kafka-topic-0' # x could be b, c, d, e, f

### Producer Mode -> Writes Data to Broker

In [4]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# [TODO]: Replace '...' with the address of your Kafka bootstrap server
producer = KafkaProducer(bootstrap_servers=os.environ['CONFLUENT_BOOTSTRAP_SERVER'],
                         security_protocol='SASL_SSL',
                         sasl_mechanism='PLAIN',
                         sasl_plain_username=os.environ['CONFLUENT_API_KEY'],
                         sasl_plain_password=os.environ['CONFLUENT_API_SECRET'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'))

# [TODO]: Add cities of your choice
cities = ['Montreal', 'Toronto', 'New York', 'Los Angeles']

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(18, 32)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

Writing to Kafka Broker
Writing: 2025-10-28 22:10:57,New York,20ºC
Writing: 2025-10-28 22:10:58,Montreal,23ºC
Writing: 2025-10-28 22:10:59,Montreal,21ºC
Writing: 2025-10-28 22:11:00,Toronto,26ºC
Writing: 2025-10-28 22:11:01,Montreal,23ºC
Writing: 2025-10-28 22:11:02,Los Angeles,18ºC
Writing: 2025-10-28 22:11:03,Los Angeles,25ºC
Writing: 2025-10-28 22:11:04,Montreal,21ºC
Writing: 2025-10-28 22:11:05,Toronto,22ºC
Writing: 2025-10-28 22:11:06,Montreal,26ºC


### Consumer Mode -> Reads Data from Broker

In [12]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# [TODO]: Complete the missing ... parameters/arguments using the Kafka documentation
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=os.environ['CONFLUENT_BOOTSTRAP_SERVER'],
    security_protocol='SASL_SSL',
    sasl_mechanism='PLAIN',
    sasl_plain_username=os.environ['CONFLUENT_API_KEY'],
    sasl_plain_password=os.environ['CONFLUENT_API_SECRET'],
    auto_offset_reset='earliest',
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000,
    consumer_timeout_ms=1e4
)

print('Reading Kafka Broker')
for message in consumer:
    message = message.value.decode()
    # Default message.value type is bytes!
    print(loads(message))
    os.system(f"echo {message} >> kafka_log.csv")

Reading Kafka Broker
2025-10-28 22:09:53,Montreal,19ºC
2025-10-28 22:10:58,Montreal,23ºC
2025-10-28 22:10:59,Montreal,21ºC
2025-10-28 22:11:06,Montreal,26ºC
2025-10-28 22:09:58,Montreal,29ºC
2025-10-28 22:11:02,Los Angeles,18ºC
2025-10-28 22:09:52,Montreal,23ºC
2025-10-28 22:09:57,Montreal,25ºC
2025-10-28 22:09:59,Montreal,22ºC
2025-10-28 22:11:01,Montreal,23ºC
2025-10-28 22:11:03,Los Angeles,25ºC
2025-10-28 22:11:05,Toronto,22ºC
2025-10-28 22:11:00,Toronto,26ºC
2025-10-28 22:09:54,Montreal,20ºC
2025-10-28 22:09:55,Montreal,22ºC
2025-10-28 22:09:56,Montreal,21ºC
2025-10-28 22:10:00,Montreal,28ºC
2025-10-28 22:10:01,Montreal,25ºC
2025-10-28 22:10:57,New York,20ºC
2025-10-28 22:11:04,Montreal,21ºC


# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [25]:
#kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset

!export $(grep -v '^#' .env | xargs)
!kcat -b $CONFLUENT_BOOTSTRAP_SERVER -t mlip-kafka-topic-0 -X security.protocol=SASL_SSL -X sasl.mechanism=PLAIN -X sasl.username=$CONFLUENT_API_KEY -X sasl.password=$CONFLUENT_API_SECRET -C -o earliest -e

"2025-10-28 22:09:52,Montreal,23\u00baC"
"2025-10-28 22:09:57,Montreal,25\u00baC"
"2025-10-28 22:09:59,Montreal,22\u00baC"
"2025-10-28 22:11:01,Montreal,23\u00baC"
"2025-10-28 22:11:03,Los Angeles,25\u00baC"
"2025-10-28 22:11:05,Toronto,22\u00baC"
"2025-10-28 22:09:56,Montreal,21\u00baC"
"2025-10-28 22:10:00,Montreal,28\u00baC"
"2025-10-28 22:10:01,Montreal,25\u00baC"
"2025-10-28 22:10:57,New York,20\u00baC"
"2025-10-28 22:11:04,Montreal,21\u00baC"
"2025-10-28 22:11:00,Toronto,26\u00baC"
%6|1761705575.015|GETSUBSCRIPTIONS|rdkafka#consumer-1| [thrd:main]: Telemetry client instance id changed from AAAAAAAAAAAAAAAAAAAAAA to B8o/NBzdTy6EMSebUFfqeQ
"2025-10-28 22:09:53,Montreal,19\u00baC"
"2025-10-28 22:10:58,Montreal,23\u00baC"
"2025-10-28 22:10:59,Montreal,21\u00baC"
"2025-10-28 22:11:06,Montreal,26\u00baC"
"2025-10-28 22:09:58,Montreal,29\u00baC"
"2025-10-28 22:11:02,Los Angeles,18\u00baC"
"2025-10-28 22:09:54,Montreal,20\u00baC"
"2025-10-28 22:09:55,Montreal,22\u00baC"
% Reached end of 