# Kafka Demo

### Connect to Kafka Broker Server 
```
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
```
Find how to connect to Kafka server on Canvas lab 2 assignment page.

### To kill connection
```
lsof -ti:<local_port> | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

In [2]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer

# Update this for your own recitation section :)
topic = 'recitation-d-sarvesh' # x could be b, c, d, e, f

### Producer Mode -> Writes Data to Broker

In [4]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# [TODO]: Replace '...' with the address of your Kafka bootstrap server
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                        value_serializer=lambda x: dumps(x).encode('utf-8'))

# [TODO]: Add cities of your choice
cities = ['Mumbai', 'Pune', 'Pittsburgh', 'New York']

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(18, 32)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

Writing to Kafka Broker
Writing: 2025-01-24 10:55:56,Pittsburgh,22ºC
Writing: 2025-01-24 10:55:57,Pittsburgh,25ºC
Writing: 2025-01-24 10:55:58,Mumbai,22ºC
Writing: 2025-01-24 10:55:59,New York,20ºC
Writing: 2025-01-24 10:56:00,Pune,24ºC
Writing: 2025-01-24 10:56:01,New York,28ºC
Writing: 2025-01-24 10:56:02,New York,28ºC
Writing: 2025-01-24 10:56:03,Mumbai,24ºC
Writing: 2025-01-24 10:56:04,Mumbai,30ºC
Writing: 2025-01-24 10:56:05,Mumbai,24ºC


### Consumer Mode -> Reads Data from Broker

In [8]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# [TODO]: Complete the missing ... parameters/arguments using the Kafka documentation
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest', #Experiment with different values
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000
)

print('Reading Kafka Broker')
for message in consumer:
    message = message.value.decode()
    # Default message.value type is bytes!
    print(loads(message))
    os.system(f"echo {message} >> kafka_log.csv")

Reading Kafka Broker
2025-01-23 23:53:18,Mumbai,31ºC
2025-01-23 23:53:19,Mumbai,28ºC
2025-01-23 23:53:20,Mumbai,20ºC
2025-01-23 23:53:21,Mumbai,22ºC
2025-01-23 23:53:22,Mumbai,24ºC
2025-01-23 23:53:23,Mumbai,27ºC
2025-01-23 23:53:24,Mumbai,25ºC
2025-01-23 23:53:25,Pune,22ºC
2025-01-23 23:53:26,Pune,29ºC
2025-01-23 23:53:27,Mumbai,30ºC
2025-01-23 23:54:57,Pittsburgh,18ºC
2025-01-23 23:54:58,Mumbai,22ºC
2025-01-23 23:54:59,Pittsburgh,24ºC
2025-01-23 23:55:00,Pittsburgh,22ºC
2025-01-23 23:55:01,Mumbai,18ºC
2025-01-23 23:55:02,Pittsburgh,21ºC
2025-01-23 23:55:03,Pittsburgh,20ºC
2025-01-23 23:55:04,Pune,26ºC
2025-01-23 23:55:05,Pittsburgh,21ºC
2025-01-23 23:55:06,Mumbai,27ºC
2025-01-23 23:55:14,Mumbai,29ºC
2025-01-23 23:55:15,Pittsburgh,19ºC
2025-01-23 23:55:16,Pune,21ºC
2025-01-23 23:55:17,New York,30ºC
2025-01-23 23:55:18,New York,26ºC
2025-01-23 23:55:19,Pune,25ºC
2025-01-23 23:55:20,Mumbai,22ºC
2025-01-23 23:55:21,New York,30ºC
2025-01-23 23:55:22,Pittsburgh,20ºC
2025-01-23 23:55:23,Pun

KeyboardInterrupt: 

# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [None]:
#kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset

In [11]:
!kcat -b localhost:9092 -L

%3|1737734923.419|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1737734924.424|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 1ms in state CONNECT, 1 identical error(s) suppressed)
%3|1737734925.428|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
% ERROR: Failed to acquire metadata: Local: Broker transport failure (Are the brokers reachable? Also try increasing the metadata timeout with -m <timeout>?)


In [12]:
!kcat -b localhost:9092 -t recitation-d-sarvesh -C

"2025-01-23 23:53:18,Mumbai,31\u00baC"
"2025-01-23 23:53:19,Mumbai,28\u00baC"
"2025-01-23 23:53:20,Mumbai,20\u00baC"
"2025-01-23 23:53:21,Mumbai,22\u00baC"
"2025-01-23 23:53:22,Mumbai,24\u00baC"
"2025-01-23 23:53:23,Mumbai,27\u00baC"
"2025-01-23 23:53:24,Mumbai,25\u00baC"
"2025-01-23 23:53:25,Pune,22\u00baC"
"2025-01-23 23:53:26,Pune,29\u00baC"
"2025-01-23 23:53:27,Mumbai,30\u00baC"
"2025-01-23 23:54:57,Pittsburgh,18\u00baC"
"2025-01-23 23:54:58,Mumbai,22\u00baC"
"2025-01-23 23:54:59,Pittsburgh,24\u00baC"
"2025-01-23 23:55:00,Pittsburgh,22\u00baC"
"2025-01-23 23:55:01,Mumbai,18\u00baC"
"2025-01-23 23:55:02,Pittsburgh,21\u00baC"
"2025-01-23 23:55:03,Pittsburgh,20\u00baC"
"2025-01-23 23:55:04,Pune,26\u00baC"
"2025-01-23 23:55:05,Pittsburgh,21\u00baC"
"2025-01-23 23:55:06,Mumbai,27\u00baC"
"2025-01-23 23:55:14,Mumbai,29\u00baC"
"2025-01-23 23:55:15,Pittsburgh,19\u00baC"
"2025-01-23 23:55:16,Pune,21\u00baC"
"2025-01-23 23:55:17,New York,30\u00baC"
"2025-01-23 23:55:18,New York,26\u00baC"
"