# Kafka Demo

### Connect to Kafka Broker Server 
```
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
```
Find how to connect to Kafka server on Canvas lab 2 assignment page.

### To kill connection
```
lsof -ti:<local_port> | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

In [5]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer

# Update this for your own recitation section :)
topic = 'recitation-c' # x could be b, c, d, e, f

### Producer Mode -> Writes Data to Broker

In [6]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# [TODO]: Replace '...' with the address of your Kafka bootstrap server
producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
                        value_serializer=lambda x: dumps(x).encode('utf-8'))

# [TODO]: Add cities of your choice
cities = ["Mumbai", "Pune", "Chicago", "Pittsburgh", "Boston", "Moline"]

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(18, 32)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

Writing to Kafka Broker
Writing: 2025-01-23 11:20:36,Moline,26ºC
Writing: 2025-01-23 11:20:37,Chicago,22ºC
Writing: 2025-01-23 11:20:38,Pune,23ºC
Writing: 2025-01-23 11:20:39,Pittsburgh,29ºC
Writing: 2025-01-23 11:20:40,Pune,23ºC
Writing: 2025-01-23 11:20:41,Moline,24ºC
Writing: 2025-01-23 11:20:42,Moline,23ºC
Writing: 2025-01-23 11:20:43,Chicago,29ºC
Writing: 2025-01-23 11:20:44,Moline,20ºC
Writing: 2025-01-23 11:20:45,Chicago,24ºC


### Consumer Mode -> Reads Data from Broker

In [10]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# [TODO]: Complete the missing ... parameters/arguments using the Kafka documentation
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=["localhost:9092"],
    auto_offset_reset="latest", #Experiment with different values
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000
)

print('Reading Kafka Broker')
for message in consumer:
    message = message.value.decode()
    # Default message.value type is bytes!
    print(loads(message))
    os.system(f"echo {message} >> kafka_log.csv")

Reading Kafka Broker
2025-01-23 11:27:52,Mumbai,28ºC
2025-01-23 11:27:53,Pune,28ºC
2025-01-23 11:27:54,Chicago,19ºC
2025-01-23 11:27:55,Pittsburgh,22ºC
2025-01-23 11:27:56,Chicago,20ºC
2025-01-23 11:27:57,Pune,26ºC
2025-01-23 11:27:58,Pune,31ºC
2025-01-23 11:27:59,Mumbai,32ºC
2025-01-23 11:28:00,Moline,26ºC
2025-01-23 11:28:01,Pune,21ºC
2025-01-23 11:29:15,New York,23ºC
2025-01-23 11:29:16,Paris,29ºC
2025-01-23 11:29:17,Tokyo,26ºC
2025-01-23 11:29:18,Sydney,32ºC
2025-01-23 11:29:19,London,22ºC
2025-01-23 11:29:20,London,23ºC
2025-01-23 11:29:21,Paris,18ºC
2025-01-23 11:29:22,Sydney,24ºC
2025-01-23 11:29:23,Tokyo,32ºC
2025-01-23 11:29:24,New York,19ºC


KeyboardInterrupt: 

# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [12]:
#kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset
!kcat -b localhost:9092 -t recitation-c -C -o beginning


"2025-01-23 09:30:12,Taipei,27\u00baC"
"2025-01-23 09:30:14,Taipei,24\u00baC"
"2025-01-23 09:30:15,Taipei,32\u00baC"
"2025-01-23 09:30:16,Taichung,22\u00baC"
"2025-01-23 09:30:17,Taichung,29\u00baC"
"2025-01-23 09:30:18,Taipei,26\u00baC"
"2025-01-23 09:30:19,Taipei,31\u00baC"
"2025-01-23 09:30:20,Taichung,29\u00baC"
"2025-01-23 09:30:21,Taichung,20\u00baC"
"2025-01-23 09:30:22,Taipei,19\u00baC"
hi, this is arthur
"2025-01-23 10:02:43,Taipei,21\u00baC"
"2025-01-23 10:02:44,Taipei,19\u00baC"
"2025-01-23 10:02:45,Taichung,30\u00baC"
"2025-01-23 10:02:46,Taipei,32\u00baC"
"2025-01-23 10:02:47,Taichung,27\u00baC"
"2025-01-23 10:02:48,Taichung,25\u00baC"
"2025-01-23 10:02:49,Taichung,18\u00baC"
"2025-01-23 10:02:50,Taichung,20\u00baC"
"2025-01-23 10:02:51,Taichung,25\u00baC"
"2025-01-23 10:02:52,Taichung,32\u00baC"
2025-01-23 10:12:41,Arthur,36ºC
"2025-01-23 11:20:36,Moline,26\u00baC"
"2025-01-23 11:20:37,Chicago,22\u00baC"
"2025-01-23 11:20:38,Pune,23\u00baC"
"2025-01-23 11:20:39,Pittsburgh