# 📚 Kafka Integration Example (Producer & Consumer)

In [None]:
# -- Imports --
from kafka import KafkaProducer, KafkaConsumer
import cv2
import base64
import numpy as np
import tensorflow as tf
import json

# -- Kafka Config --
KAFKA_TOPIC = "image-topic"
KAFKA_SERVER = "localhost:9092"

# -- Kafka Producer: Sending Images --
producer = KafkaProducer(
    bootstrap_servers=[KAFKA_SERVER],
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

# Load a sample image for demonstration
sample_image = tf.keras.utils.get_file(
    "grace_hopper.jpg",
    "https://storage.googleapis.com/download.tensorflow.org/example_images/grace_hopper.jpg"
)

image = cv2.imread(sample_image)
_, buffer = cv2.imencode('.jpg', image)
jpg_as_text = base64.b64encode(buffer).decode('utf-8')

# Send the encoded image to Kafka
data = {"image": jpg_as_text}
producer.send(KAFKA_TOPIC, value=data)
producer.flush()
print("✅ Sent one sample image to Kafka.")

# -- Kafka Consumer: Receiving Images --
consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=[KAFKA_SERVER],
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

print("✅ Listening for images...")
for message in consumer:
    encoded_img = message.value['image']
    img_bytes = base64.b64decode(encoded_img)
    nparr = np.frombuffer(img_bytes, np.uint8)
    img_np = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

    # Display the received image
    cv2.imshow('Kafka Image Stream', img_np)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cv2.destroyAllWindows()
