# Kafka Demo

### Connect to Kafka Broker Server 
```
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
```
ssh -L 9092:localhost:9092 user@example.com -NTf

Find how to connect to Kafka server on Canvas lab 2 assignment page.

### To kill connection
```
lsof -ti:<local_port> | xargs kill -9
```
### Setup
#```
python -m pip install kafka-python
#```

In [8]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer

# Update this for your own recitation section :)
topic = 'recitation-x' # x could be b, c, d, e, f

### Producer Mode -> Writes Data to Broker

In [11]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# [TODO]: Replace '...' with the address of your Kafka bootstrap server
producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
                        value_serializer=lambda x: dumps(x).encode('utf-8'))

# [TODO]: Add cities of your choice
cities = ["Bogota", "Quito", "Lima"]
#cities = ["London", "Berlin", "Paris", "Madrid", "Rome", "Tokyo"]

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(9):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(10, 20)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

Writing to Kafka Broker
Writing: 2025-03-16 15:49:03,Bogota,17ºC
Writing: 2025-03-16 15:49:04,Lima,18ºC
Writing: 2025-03-16 15:49:05,Bogota,20ºC
Writing: 2025-03-16 15:49:06,Quito,12ºC
Writing: 2025-03-16 15:49:07,Quito,12ºC
Writing: 2025-03-16 15:49:08,Bogota,14ºC
Writing: 2025-03-16 15:49:09,Lima,10ºC
Writing: 2025-03-16 15:49:10,Quito,19ºC
Writing: 2025-03-16 15:49:11,Quito,11ºC


### Consumer Mode -> Reads Data from Broker

In [10]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# [TODO]: Complete the missing ... parameters/arguments using the Kafka documentation
consumer = KafkaConsumer(
    'recitation-x', # Topic name
    bootstrap_servers=["localhost:9092"], # Kafka broker address
    auto_offset_reset="earliest", #Experiment with different values
    #auto_offset_reset="latest", #Experiment with different values
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000
)

print('Reading Kafka Broker')
for message in consumer:
    message = message.value.decode()
    # Default message.value type is bytes!
    print(loads(message))
    os.system(f"echo {message} >> kafka_log.csv")

Reading Kafka Broker
2025-03-16 15:46:20,Lima,18ºC
2025-03-16 15:46:21,Lima,19ºC
2025-03-16 15:46:22,Lima,12ºC
2025-03-16 15:46:23,Bogota,13ºC
2025-03-16 15:46:24,Quito,11ºC
2025-03-16 15:46:25,Quito,20ºC
2025-03-16 15:46:26,Lima,10ºC
2025-03-16 15:46:27,Quito,20ºC
2025-03-16 15:46:28,Quito,18ºC


KeyboardInterrupt: 

# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [None]:
#kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset
!docker run --rm edenhill/kcat:1.7.1 -C -b 127.0.0.1:9092 -t recitation-x -o beginning

In [None]:
!docker run -it --rm edenhill/kcat:1.7.1 -C -b localhost:9092 -t recitation-x -o beginning

In [None]:
#kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset
!docker run --rm edenhill/kcat:1.7.1 -C -b 172.17.0.2:9092 -t recitation-x -o beginning

In [None]:
!docker run --rm edenhill/kcat:1.7.1 -L -b 172.17.0.2:9092

In [None]:
!docker run --rm edenhill/kcat:1.7.1 -C -b 172.17.0.2:9092 -t recitation-x -o beginning > messages.txt