# Kafka Demo

### Connect to Kafka Broker Server
```
ssh -o ServerAliveInterval=60 -L 9092:localhost:9092 tunnel@128.2.204.215   
```
pass: seaitunnel


To kill connection at port:
```
lsof -ti:9092 | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

In [4]:
from os import path
import sys, os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
import numpy as np
from kafka import KafkaConsumer, KafkaProducer

# Update this for your own recitation section :)
topic = 'recitation-d'

### Producer Mode -> Writes Data to Broker

In [7]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# [TODO]: Replace '...' with the address of your Kafka bootstrap server
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                        value_serializer=lambda x: dumps(x).encode('utf-8'),
                        )

# [TODO]: Add cities of your choice
cities = ["Boston", "Philadelphia", "Trenton", "Baltimore", "Austin"]

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(18, 32)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

Writing to Kafka Broker
Writing: 2024-01-26 11:15:06,Baltimore,31ºC
Writing: 2024-01-26 11:15:07,Austin,21ºC
Writing: 2024-01-26 11:15:08,Trenton,20ºC
Writing: 2024-01-26 11:15:09,Trenton,29ºC
Writing: 2024-01-26 11:15:10,Trenton,27ºC
Writing: 2024-01-26 11:15:11,Boston,25ºC
Writing: 2024-01-26 11:15:12,Austin,29ºC
Writing: 2024-01-26 11:15:13,Philadelphia,32ºC
Writing: 2024-01-26 11:15:14,Trenton,19ºC
Writing: 2024-01-26 11:15:15,Austin,20ºC


### Consumer Mode -> Reads Data from Broker

In [9]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# [TODO]: Complete the missing ... parameters/arguments using the Kafka documentation
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest', #Experiment with different values
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000
)

print('Reading Kafka Broker')
for message in consumer:

    message = message.value.decode()
    # Default message.value type is bytes!
    print(loads(message))
    os.system(f"echo {message} >> kafka_log.csv")

Reading Kafka Broker
2024-01-24 14:27:49,London,22ºC
2024-01-24 14:27:51,London,23ºC
2024-01-24 14:27:52,New York,23ºC
2024-01-24 14:27:53,London,21ºC
2024-01-24 14:27:54,Pittsburgh,24ºC
2024-01-24 14:27:55,Pittsburgh,30ºC
2024-01-24 14:27:56,London,27ºC
2024-01-24 14:27:57,London,32ºC
2024-01-24 14:27:58,London,31ºC
2024-01-24 14:27:59,London,25ºC
2024-01-24 14:38:17,New York,21ºC
2024-01-24 14:38:18,Bangalore,20ºC
2024-01-24 14:38:19,London,22ºC
2024-01-24 14:38:20,Bangalore,30ºC
2024-01-24 14:38:21,Bangalore,26ºC
2024-01-24 14:38:22,Pittsburgh,20ºC
2024-01-24 14:38:23,New York,19ºC
2024-01-24 14:38:24,London,26ºC
2024-01-24 14:38:25,London,25ºC
2024-01-24 14:38:26,Bangalore,26ºC
2024-01-24 16:26:20,Seattle,31ºC
2024-01-24 16:26:21,Beijing,18ºC
2024-01-24 16:26:22,Montreal,26ºC
2024-01-24 16:26:23,Montreal,20ºC
2024-01-24 16:26:24,Tokyo,24ºC
2024-01-24 16:26:25,Montreal,24ºC
2024-01-24 16:26:26,Beijing,21ºC
2024-01-24 16:26:27,Seattle,32ºC
2024-01-24 16:26:28,Montreal,24ºC
2024-01-24

2024-01-25 17:32:18,shanghai,29ºC
2024-01-25 17:32:19,beijing,23ºC
2024-01-25 17:32:20,shanghai,27ºC
2024-01-25 17:32:21,beijing,24ºC
2024-01-25 17:32:22,beijing,32ºC
2024-01-25 17:32:23,beijing,30ºC
2024-01-25 17:32:24,beijing,29ºC
2024-01-25 17:32:25,shanghai,23ºC
2024-01-25 17:32:26,shanghai,28ºC
2024-01-25 17:46:19,Pittsburgh,23ºC
2024-01-25 17:46:20,London,26ºC
2024-01-25 17:46:21,Pittsburgh,28ºC
2024-01-25 17:46:22,New York,25ºC
2024-01-25 17:46:23,London,25ºC
2024-01-25 17:46:24,Bangalore,23ºC
2024-01-25 17:46:25,Bangalore,24ºC
2024-01-25 17:46:26,New York,20ºC
2024-01-25 17:46:27,London,19ºC
2024-01-25 17:46:28,New York,19ºC
2024-01-25 17:51:06,beijing,18ºC
2024-01-25 17:51:07,beijing,26ºC
2024-01-25 17:51:08,beijing,30ºC
2024-01-25 17:51:09,shanghai,22ºC
2024-01-25 17:51:10,beijing,21ºC
2024-01-25 17:51:11,beijing,19ºC
2024-01-25 17:51:12,beijing,30ºC
2024-01-25 17:51:13,beijing,32ºC
2024-01-25 17:51:14,shanghai,25ºC
2024-01-25 17:51:15,shanghai,25ºC
2024-01-25 18:25:38,Baltim

2024-01-25 21:26:32,pittsburgh,26ºC
2024-01-25 21:26:33,pittsburgh,24ºC
2024-01-25 21:26:34,new york city,32ºC
2024-01-25 21:26:35,pittsburgh,30ºC
2024-01-25 21:26:36,pittsburgh,27ºC
2024-01-25 21:26:37,pittsburgh,26ºC
2024-01-25 21:30:45,CityA,20ºC
2024-01-25 21:30:46,CityB,27ºC
2024-01-25 21:30:47,CityA,18ºC
2024-01-25 21:30:48,CityC,21ºC
2024-01-25 21:30:49,CityB,22ºC
2024-01-25 21:30:50,CityC,25ºC
2024-01-25 21:30:51,CityC,27ºC
2024-01-25 21:30:52,CityB,28ºC
2024-01-25 21:30:53,CityC,19ºC
2024-01-25 21:30:54,CityB,18ºC
2024-01-25 22:21:35,Chicago,30ºC
2024-01-25 22:21:36,San Francisco,26ºC
2024-01-25 22:21:37,Pittsburgh,18ºC
2024-01-25 22:21:38,Chicago,32ºC
2024-01-25 22:21:39,Las Vegas,30ºC
2024-01-25 22:21:40,Las Vegas,31ºC
2024-01-25 22:21:41,Orlando,23ºC
2024-01-25 22:21:42,Pittsburgh,18ºC
2024-01-25 22:21:43,Las Vegas,24ºC
2024-01-25 22:21:44,Orlando,26ºC
2024-01-25 22:22:13,Michigan,22ºC
2024-01-25 22:22:14,Michigan,26ºC
2024-01-25 22:22:15,Las Vegas,31ºC
2024-01-25 22:22:16,

2024-01-26 00:29:11,jodhpur,24ºC
2024-01-26 00:29:12,jodhpur,20ºC
2024-01-26 00:29:13,jodhpur,23ºC
2024-01-26 00:29:14,jodhpur,28ºC
2024-01-26 00:59:51,Jamshedpur,26ºC
2024-01-26 00:59:52,Hokagetown,19ºC
2024-01-26 00:59:53,Pittsburgh,18ºC
2024-01-26 00:59:54,Pittsburgh,21ºC
2024-01-26 00:59:55,Leaf village,20ºC
2024-01-26 00:59:56,Jamshedpur,28ºC
2024-01-26 00:59:57,Leaf village,25ºC
2024-01-26 00:59:58,Pittsburgh,27ºC
2024-01-26 00:59:59,Jamshedpur,18ºC
2024-01-26 01:00:00,Hokagetown,21ºC
2024-01-26 01:15:47,San Jose,29ºC
2024-01-26 01:15:48,Miami,30ºC
2024-01-26 01:15:49,Miami,24ºC
2024-01-26 01:15:50,San Jose,21ºC
2024-01-26 01:15:51,San Jose,18ºC
2024-01-26 01:15:52,San Jose,18ºC
2024-01-26 01:15:53,San Jose,18ºC
2024-01-26 01:15:54,San Jose,24ºC
2024-01-26 01:15:55,Miami,20ºC
2024-01-26 01:15:56,Miami,31ºC
2024-01-26 02:08:45,Los Angeles,31ºC
2024-01-26 02:08:46,Mumbai,21ºC
2024-01-26 02:08:47,Los Angeles,23ºC
2024-01-26 02:08:48,New York,23ºC
2024-01-26 02:08:49,New York,26ºC
20

2024-01-26 10:34:31,City1,29ºC
2024-01-26 10:34:31,Jinja,20ºC
2024-01-26 10:34:32,City2,31ºC
2024-01-26 10:34:33,City1,31ºC
2024-01-26 10:34:34,City3,20ºC
2024-01-26 10:34:35,City1,25ºC
2024-01-26 10:34:36,City3,32ºC
2024-01-26 10:34:37,City1,27ºC
2024-01-26 10:34:38,City1,28ºC
2024-01-26 11:07:22,Seattle,31ºC
2024-01-26 11:07:23,Boston,26ºC
2024-01-26 11:07:24,Seattle,28ºC
2024-01-26 11:07:25,Seattle,27ºC
2024-01-26 11:07:26,Boston,21ºC
2024-01-26 11:07:27,Tampa,20ºC
2024-01-26 11:07:28,Seattle,31ºC
2024-01-26 11:07:29,Tampa,30ºC
2024-01-26 11:07:30,Philadelphia,24ºC
2024-01-26 11:07:31,Philadelphia,22ºC
2024-01-26 11:08:34,Pittsburgh,20ºC
2024-01-26 11:08:35,Los Angeles,30ºC
2024-01-26 11:08:36,Los Angeles,26ºC
2024-01-26 11:08:37,Los Angeles,29ºC
2024-01-26 11:08:38,Pittsburgh,22ºC
2024-01-26 11:08:39,Pittsburgh,18ºC
2024-01-26 11:08:40,Los Angeles,19ºC
2024-01-26 11:08:41,Los Angeles,21ºC
2024-01-26 11:08:42,Pittsburgh,26ºC
2024-01-26 11:08:43,Los Angeles,22ºC
2024-01-26 11:09:22,L

2024-01-26 11:35:13,Los Angeles,28ºC
2024-01-26 11:35:14,Los Angeles,32ºC
2024-01-26 11:35:15,Mars,30ºC
2024-01-26 11:35:15,Los Angeles,18ºC
2024-01-26 11:35:16,London,25ºC
2024-01-26 11:35:16,Topeka,23ºC
2024-01-26 11:35:16,Los Angeles,22ºC
2024-01-26 11:35:17,Mars,26ºC
2024-01-26 11:35:17,San Francisco,26ºC
2024-01-26 11:35:18,Paris,29ºC
2024-01-26 11:35:18,London,22ºC
2024-01-26 11:35:19,Mars,20ºC
2024-01-26 11:35:19,Mumbai,32ºC
2024-01-26 11:35:20,Paris,20ºC
2024-01-26 11:35:20,London,25ºC
2024-01-26 11:35:21,Paris,31ºC
2024-01-26 11:35:21,Pittsburgh,28ºC
2024-01-26 11:35:22,Tokyo,22ºC
2024-01-26 11:35:22,Mumbai,32ºC
2024-01-26 11:35:23,Sydney,21ºC
2024-01-26 11:35:23,Topeka,22ºC
2024-01-26 11:35:24,Tokyo,18ºC
2024-01-26 11:35:24,London,19ºC
2024-01-26 11:35:25,New York,19ºC
2024-01-26 11:38:25,Tokyo,26ºC
2024-01-26 11:38:26,Mars,29ºC
2024-01-26 11:38:27,Paris,19ºC
2024-01-26 11:38:28,Tokyo,23ºC
2024-01-26 11:38:29,Tokyo,26ºC
2024-01-26 11:38:30,Sydney,30ºC
2024-01-26 11:38:31,New 

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [None]:
#kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset

