# Kafka Demo

### Connect to Kafka Broker Server
```
ssh -o ServerAliveInterval=60 -L 9092:localhost:9092 tunnel@128.2.204.215 -NTf
```
pass: seaitunnel


To kill connection at port:
```
lsof -ti:9092 | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

In [7]:
from os import path
import sys, os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
import numpy as np
# ssh -o ServerAliveInterval=60 -L 9092:localhost:9092 tunnel@128.2.204.215 -NTf
from kafka import KafkaConsumer, KafkaProducer

# Update this for your demo otherwise you'll see my data :)
topic = 'test_ranadeep_2'

### Producer Mode -> Writes Data to Broker

In [None]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'),
                        )
cities = ['Pittsburgh','New York','London','Bangalore','Shanghai','Tokyo','Munich']
# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(18, 32)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

### Consumer Mode -> Reads Data from Broker

In [None]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=['localhost:9092'],
    # Read from the start of the topic; Default is latest
    auto_offset_reset='earliest',
    # auto_offset_reset='latest',
    # group_id='team13',
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000
)

print('Reading Kafka Broker')
for message in consumer:

    message = message.value.decode()
    # Default message.value type is bytes!
    print(loads(message))
    os.system(f"echo {message} >> kafka_log.csv")

Reading Kafka Broker
2022-09-08 05:34:33,Munich,19ºC
2022-09-08 05:34:34,Munich,30ºC
2022-09-08 05:34:35,Munich,24ºC
2022-09-08 05:34:36,Tokyo,27ºC
2022-09-08 05:34:37,Pittsburgh,18ºC
2022-09-08 05:34:38,Pittsburgh,23ºC
2022-09-08 05:34:39,New York,24ºC
2022-09-08 05:34:40,London,21ºC
2022-09-08 05:34:41,Shanghai,18ºC
2022-09-08 05:34:42,Tokyo,28ºC
2022-09-08 05:35:23,London,31ºC
2022-09-08 05:35:24,New York,27ºC
2022-09-08 05:35:25,Pittsburgh,22ºC
2022-09-08 05:35:26,Shanghai,29ºC
2022-09-08 05:35:27,Pittsburgh,25ºC
2022-09-08 05:35:28,New York,30ºC
2022-09-08 05:35:29,New York,26ºC
2022-09-08 05:35:30,Bangalore,26ºC
2022-09-08 05:35:31,Shanghai,20ºC
2022-09-08 05:35:32,Tokyo,23ºC
2022-09-08 05:36:34,Pittsburgh,19ºC
2022-09-08 05:36:35,London,20ºC
2022-09-08 05:36:36,Munich,31ºC
2022-09-08 05:36:37,Shanghai,28ºC
2022-09-08 06:05:59,Tokyo,29ºC
2022-09-08 06:06:00,New York,27ºC
2022-09-08 06:06:01,New York,29ºC
2022-09-08 06:06:02,Bangalore,27ºC
2022-09-08 18:11:49,Tokyo,20ºC
2022-09-08

# OR Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat

Install with your package installer such as:
```
brew install kcat
apt-get install kcat
& more
```

Use: 
```
kcat -b <broker> -t <topic> -p <partition>
```

Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [None]:
! kcat -b localhost:9092 -t "$topic" -p earliest

### See more options using
```
kcat
```

In [None]:
! kcat