# Kafka Demo

### Connect to Kafka Broker Server 
```
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
```
Find how to connect to Kafka server on Canvas lab 2 assignment page.

### To kill connection
```
lsof -ti:<local_port> | xargs kill -9
```

### Setup
I recommend setting up a python environment for this lab (and all other ones).
```
python -m venv <environment_name>
source <environment_name>/bin/activate
```
Then you can install the requirements.
```
python -m pip install kafka-python
```

In [10]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer
from typing import Dict, Any

# TODO: Fill in a unique identifier so your topic doesn't collide with others'
andrew_id = "mkipsang" # you can also use any unique identifier for yourself if you prefer :)
topic = f"lab02-{andrew_id}"
print(f"Topic: {topic}")

Topic: lab02-mkipsang


### Producer Mode -> Writes Data to Broker

In [11]:
# I have provided the following schema for messages. You may change the city data if you wish but it is optional.
def make_city_data(city: str, temperature_f: str) -> Dict[str, Any]:
    return {
        "city": city,
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "temperature_f": temperature_f, # temperature in fahrenheit
    }

In [12]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# [TODO]: Fill in the address of your Kafka bootstrap server
# [TODO]: Kafka expects messages as bytes. Explore the documentation and decide how to serialize Python dict objects into bytes.
# Hint: You may want to convert your Python dict → JSON string → UTF-8 bytes.

producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
                        value_serializer=lambda v: dumps(v).encode("utf-8"))

In [13]:
# [TODO]: Add a few more examples of city data below
cities = [make_city_data("Pittsburgh" , 64), make_city_data("Nairobi" , 75), make_city_data("Cape Town" , 70), make_city_data("Cairo" , 85), make_city_data("Melbourne" , 52),]

print("Writing to Kafka Broker")
for i in range(10):
    data = cities[randint(0,len(cities)-1)] # random selection
    producer.send(topic=topic, value=data)
    sleep(1)

producer.flush()
print(f"Data written to topic: {topic}")

Writing to Kafka Broker
Data written to topic: lab02-mkipsang


### Consumer Mode -> Reads Data from Broker

In [15]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# [TODO]: Complete the missing ... parameters/arguments using the Kafka documentation
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=["localhost:9092"],
    auto_offset_reset="earliest", #Experiment with different values (earliest, latest, none)
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000
)

print('Reading Kafka Broker')
for message in consumer:
    message = message.value.decode()
    # Default message.value type is bytes!
    print(loads(message))
    os.system(f"echo {message} >> kafka_log.csv")

Reading Kafka Broker
{'city': 'Melbourne', 'timestamp': '2025-09-05 10:10:34', 'temperature_f': 52}
{'city': 'Cape Town', 'timestamp': '2025-09-05 10:10:34', 'temperature_f': 70}
{'city': 'Nairobi', 'timestamp': '2025-09-05 10:10:34', 'temperature_f': 75}
{'city': 'Cairo', 'timestamp': '2025-09-05 10:10:34', 'temperature_f': 85}
{'city': 'Cairo', 'timestamp': '2025-09-05 10:10:34', 'temperature_f': 85}
{'city': 'Pittsburgh', 'timestamp': '2025-09-05 10:10:34', 'temperature_f': 64}
{'city': 'Nairobi', 'timestamp': '2025-09-05 10:10:34', 'temperature_f': 75}
{'city': 'Pittsburgh', 'timestamp': '2025-09-05 10:10:34', 'temperature_f': 64}
{'city': 'Cape Town', 'timestamp': '2025-09-05 10:10:34', 'temperature_f': 70}
{'city': 'Cape Town', 'timestamp': '2025-09-05 10:10:34', 'temperature_f': 70}
{'city': 'Cairo', 'timestamp': '2025-09-05 10:31:07', 'temperature_f': 85}
{'city': 'Melbourne', 'timestamp': '2025-09-05 10:31:07', 'temperature_f': 52}
{'city': 'Pittsburgh', 'timestamp': '2025-09-

KeyboardInterrupt: 

# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [None]:
# [TODO]: Use kcat to consume the first 5 messages from your topic. Use the -f flag to include the offset in the output.
# Paste the output below and explain what the offset refers to.

PS C:\Windows\system32> docker run --rm --add-host=localhost:host-gateway edenhill/kcat:1.7.1 -X broker.address.family=v4 -b host.docker.internal:9092 -t lab02-mkipsang -C -o beginning -c 5 -f "Offset: %o, Message: %s`n"
Offset: 0, Message: {"city": "Melbourne", "timestamp": "2025-09-05 10:10:34", "temperature_f": 52}
Offset: 1, Message: {"city": "Cape Town", "timestamp": "2025-09-05 10:10:34", "temperature_f": 70}
Offset: 2, Message: {"city": "Nairobi", "timestamp": "2025-09-05 10:10:34", "temperature_f": 75}
Offset: 3, Message: {"city": "Cairo", "timestamp": "2025-09-05 10:10:34", "temperature_f": 85}
Offset: 4, Message: {"city": "Cairo", "timestamp": "2025-09-05 10:10:34", "temperature_f": 85}
PS C:\Windows\system32>


### Explanation
- The **offset** is the sequential ID for each message within a partition of the topic.  
- It starts at 0 and increases by 1 for each new record.  
- Kafka uses offsets to track where each consumer group is in the log, so consumers can resume reading from the correct spot after restarting.