# Producer C


In [None]:
import pandas as pd
from kafka import KafkaProducer
from datetime import datetime
import time
import json

In [15]:
# Load the data
df = pd.read_csv("data/camera_event_C.csv")

# Kafka settings
producer_id = 3
topic_name = "camera_events"
# kafka_host = "10.192.1.110:9092"
# kafka_host = "192.168.100.165:9092"
kafka_host = "10.192.0.245:9092"

batch_interval = 1  # seconds

# Connect Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=[kafka_host],
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    key_serializer=lambda k: str(k).encode("utf-8"),
    api_version=(0, 10)
)

# Group by batch_id
grouped = df.groupby("batch_id")

# Send batch-wise
for batch_id, group in grouped:
    send_time = datetime.utcnow().isoformat()
    timestamp_dt = datetime.fromisoformat(send_time)
    timestamp_ms = int(timestamp_dt.timestamp() * 1000)

    # Add extra metadata
    group = group.copy()
    group["producer_id"] = producer_id
    group["camera_id"] = 3
    group["send_time"] = send_time

    records = group.to_dict(orient="records")

    print(f"Sending batch_id={batch_id} | Records={len(records)} | Timestamp={send_time} | Sending from producer {producer_id}")

    # Send to Kafka (as batch payload)
    producer.send(
        topic=topic_name,
        key=producer_id,
        value=records,
        timestamp_ms=timestamp_ms
    )

    time.sleep(batch_interval)

print("All batches sent.")


📤 Sending batch_id=1 | Records=1 | Timestamp=2025-05-28T03:54:01.945758 | Sending from producer 3
📤 Sending batch_id=2 | Records=1 | Timestamp=2025-05-28T03:54:02.777606 | Sending from producer 3
📤 Sending batch_id=3 | Records=2 | Timestamp=2025-05-28T03:54:03.597699 | Sending from producer 3
📤 Sending batch_id=4 | Records=2 | Timestamp=2025-05-28T03:54:04.407859 | Sending from producer 3
📤 Sending batch_id=5 | Records=1 | Timestamp=2025-05-28T03:54:05.213467 | Sending from producer 3
📤 Sending batch_id=6 | Records=1 | Timestamp=2025-05-28T03:54:06.018821 | Sending from producer 3
📤 Sending batch_id=7 | Records=2 | Timestamp=2025-05-28T03:54:06.823895 | Sending from producer 3
📤 Sending batch_id=8 | Records=1 | Timestamp=2025-05-28T03:54:07.634619 | Sending from producer 3
📤 Sending batch_id=9 | Records=2 | Timestamp=2025-05-28T03:54:08.460194 | Sending from producer 3
📤 Sending batch_id=10 | Records=2 | Timestamp=2025-05-28T03:54:09.269930 | Sending from producer 3
📤 Sending batch_id=

KeyboardInterrupt: 