# Kafka Demo

### Connect to Kafka Broker Server 
```
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
```
Find how to connect to Kafka server on Canvas lab 2 assignment page.

### To kill connection
```
lsof -ti:<local_port> | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

In [1]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer

# Update this for your own recitation section :)
topic = 'recitation-x' # x could be b, c, d, e, f

### Producer Mode -> Writes Data to Broker

In [2]:
from kafka import KafkaProducer
from json import dumps
from time import sleep
from datetime import datetime
from random import randint

# Create a producer to write data to Kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# Kafka bootstrap server
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'))

# Topic name updated to 'breakfast'
topic = "breakfast"

# [TODO]: Define breakfast items with main ingredients
breakfast_items = [
    {"name": "Pancakes", "ingredient1": "Flour", "ingredient2": "Eggs", "ingredient3": "Milk"},
    {"name": "Omelette", "ingredient1": "Eggs", "ingredient2": "Cheese", "ingredient3": "Bell Peppers"},
    {"name": "Smoothie", "ingredient1": "Banana", "ingredient2": "Yogurt", "ingredient3": "Honey"},
    {"name": "Toast", "ingredient1": "Bread", "ingredient2": "Butter", "ingredient3": "Jam"},
    {"name": "Cereal", "ingredient1": "Oats", "ingredient2": "Milk", "ingredient3": "Fruits"}
]

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    breakfast_data = breakfast_items[randint(0, len(breakfast_items) - 1)]  # Select a random breakfast item
    data = {
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "dish": breakfast_data["name"],
        "ingredient1": breakfast_data["ingredient1"],
        "ingredient2": breakfast_data["ingredient2"],
        "ingredient3": breakfast_data["ingredient3"]
    }
    
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1) 

print("Breakfast data successfully sent to Kafka!")


Writing to Kafka Broker
Writing: {'timestamp': '2025-02-05 17:25:14', 'dish': 'Toast', 'ingredient1': 'Bread', 'ingredient2': 'Butter', 'ingredient3': 'Jam'}
Writing: {'timestamp': '2025-02-05 17:25:15', 'dish': 'Smoothie', 'ingredient1': 'Banana', 'ingredient2': 'Yogurt', 'ingredient3': 'Honey'}
Writing: {'timestamp': '2025-02-05 17:25:16', 'dish': 'Smoothie', 'ingredient1': 'Banana', 'ingredient2': 'Yogurt', 'ingredient3': 'Honey'}
Writing: {'timestamp': '2025-02-05 17:25:17', 'dish': 'Omelette', 'ingredient1': 'Eggs', 'ingredient2': 'Cheese', 'ingredient3': 'Bell Peppers'}
Writing: {'timestamp': '2025-02-05 17:25:18', 'dish': 'Omelette', 'ingredient1': 'Eggs', 'ingredient2': 'Cheese', 'ingredient3': 'Bell Peppers'}
Writing: {'timestamp': '2025-02-05 17:25:19', 'dish': 'Pancakes', 'ingredient1': 'Flour', 'ingredient2': 'Eggs', 'ingredient3': 'Milk'}
Writing: {'timestamp': '2025-02-05 17:25:20', 'dish': 'Smoothie', 'ingredient1': 'Banana', 'ingredient2': 'Yogurt', 'ingredient3': 'Hone

### Consumer Mode -> Reads Data from Broker

In [3]:
from kafka import KafkaConsumer
import os
import json

# Create a consumer to read data from Kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

consumer = KafkaConsumer(
    'breakfast',  # Updated topic name
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',  # Reads messages from the beginning
    enable_auto_commit=True,  # Commit that an offset has been read
    auto_commit_interval_ms=1000,  # How often to tell Kafka an offset has been read
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),  # Deserialize JSON message
    consumer_timeout_ms=5000  # Stop consumer when no new messages arrive
)

print('Reading from Kafka Broker...')
for message in consumer:
    message_data = message.value  # JSON data is already deserialized
    print(message_data)

    # Write received message to a log file
    with open("kafka_log.csv", "a") as log_file:
        log_file.write(f"{message_data['timestamp']},{message_data['dish']},{message_data['ingredient1']},{message_data['ingredient2']},{message_data['ingredient3']}\n")

print("Finished reading messages.")


Reading from Kafka Broker...
{'timestamp': '2025-02-05 17:25:14', 'dish': 'Toast', 'ingredient1': 'Bread', 'ingredient2': 'Butter', 'ingredient3': 'Jam'}
{'timestamp': '2025-02-05 17:25:15', 'dish': 'Smoothie', 'ingredient1': 'Banana', 'ingredient2': 'Yogurt', 'ingredient3': 'Honey'}
{'timestamp': '2025-02-05 17:25:16', 'dish': 'Smoothie', 'ingredient1': 'Banana', 'ingredient2': 'Yogurt', 'ingredient3': 'Honey'}
{'timestamp': '2025-02-05 17:25:17', 'dish': 'Omelette', 'ingredient1': 'Eggs', 'ingredient2': 'Cheese', 'ingredient3': 'Bell Peppers'}
{'timestamp': '2025-02-05 17:25:18', 'dish': 'Omelette', 'ingredient1': 'Eggs', 'ingredient2': 'Cheese', 'ingredient3': 'Bell Peppers'}
{'timestamp': '2025-02-05 17:25:19', 'dish': 'Pancakes', 'ingredient1': 'Flour', 'ingredient2': 'Eggs', 'ingredient3': 'Milk'}
{'timestamp': '2025-02-05 17:25:20', 'dish': 'Smoothie', 'ingredient1': 'Banana', 'ingredient2': 'Yogurt', 'ingredient3': 'Honey'}
{'timestamp': '2025-02-05 17:25:21', 'dish': 'Pancakes

KeyboardInterrupt: 

# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [None]:
#kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset