# Kafka Demo

### Connect to Kafka Broker Server
```
ssh -o ServerAliveInterval=60 -L 9092:localhost:9092 tunnel@128.2.204.215 -NTf
```
pass: seaitunnel


To kill connection at port:
```
lsof -ti:9092 | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

In [1]:
from os import path
import sys, os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
import numpy as np
# ssh -o ServerAliveInterval=60 -L 9092:localhost:9092 tunnel@128.2.24.106 -NTf
from kafka import KafkaConsumer, KafkaProducer

# Update this for your own recitation section :)
topic = 'recitation-c'

### Producer Mode -> Writes Data to Broker

In [7]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

producer = KafkaProducer(bootstrap_servers='localhost:9092',
                        value_serializer=lambda x: dumps(x).encode('utf-8'),
                        )

cities = ['Pittsbruh', 'Tokyou', 'Sydnay']

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(18, 32)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

Writing to Kafka Broker
Writing: 2024-01-26 11:43:51,Pittsbruh,30ºC
Writing: 2024-01-26 11:43:52,Tokyou,24ºC
Writing: 2024-01-26 11:43:53,Pittsbruh,19ºC
Writing: 2024-01-26 11:43:54,Tokyou,21ºC
Writing: 2024-01-26 11:43:55,Tokyou,26ºC
Writing: 2024-01-26 11:43:56,Pittsbruh,24ºC
Writing: 2024-01-26 11:43:57,Sydnay,29ºC
Writing: 2024-01-26 11:43:58,Sydnay,27ºC
Writing: 2024-01-26 11:43:59,Pittsbruh,19ºC
Writing: 2024-01-26 11:44:00,Pittsbruh,31ºC


### Consumer Mode -> Reads Data from Broker

In [5]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

consumer = KafkaConsumer(
    topic,
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest', #Experiment with different values
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000
)

print('Reading Kafka Broker')
for message in consumer:
    message = message.value.decode()
    # Default message.value type is bytes!
    try:
        print(loads(message))
    except:
        print("lmao")
    os.system(f"echo {message} >> kafka_log.csv")

Reading Kafka Broker
2024-01-24 21:54:22,New York City,29ºC
2024-01-24 21:54:23,Washington D.C.,21ºC
2024-01-24 21:54:24,Atlanta,25ºC
2024-01-24 21:54:25,New York City,22ºC
2024-01-24 21:54:26,Pittsburgh,31ºC
2024-01-24 21:54:27,Pittsburgh,27ºC
2024-01-24 21:54:28,Washington D.C.,31ºC
2024-01-24 21:54:29,New York City,27ºC
2024-01-24 21:54:30,Pittsburgh,25ºC
2024-01-24 21:54:31,New York City,19ºC
2024-01-25 10:33:04,Seattle,32ºC
2024-01-25 10:33:05,Seattle,24ºC
2024-01-25 10:33:06,Seattle,20ºC
2024-01-25 10:33:07,Seattle,29ºC
2024-01-25 10:33:08,Boston,18ºC
2024-01-25 10:33:09,Seattle,18ºC
2024-01-25 10:33:10,Seattle,32ºC
2024-01-25 10:33:11,Boston,28ºC
2024-01-25 10:33:12,Boston,20ºC
2024-01-25 10:33:13,Seattle,28ºC
2024-01-25 14:22:29,Pittsburgh,19ºC
2024-01-25 14:22:30,Pittsburgh,31ºC
2024-01-25 14:22:31,Atlanta,22ºC
2024-01-25 14:22:32,Pittsburgh,32ºC
2024-01-25 14:22:33,Pittsburgh,21ºC
2024-01-25 14:22:34,Pittsburgh,29ºC
2024-01-25 14:22:35,Pittsburgh,30ºC
2024-01-25 14:22:36,Pitt

KeyboardInterrupt: 

# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [None]:
#kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset

# kcat -b localhost:9092 -t recitation-c -o beginning