# Kafka Producer - Clean & Improved

Production-ready Kafka producer using Python.

In [None]:

import json
import time
import pandas as pd
from kafka import KafkaProducer
from kafka.errors import KafkaError


In [None]:

KAFKA_BROKER = "localhost:9092"
TOPIC_NAME = "demo_test"
CSV_PATH = "data/indexProcessed.csv"
SLEEP_INTERVAL = 1


In [None]:

def create_producer(broker: str) -> KafkaProducer:
    return KafkaProducer(
        bootstrap_servers=[broker],
        value_serializer=lambda value: json.dumps(value).encode("utf-8"),
        retries=3,
        linger_ms=10
    )


In [None]:

def load_data(csv_path: str) -> pd.DataFrame:
    return pd.read_csv(csv_path)


In [None]:

def send_random_records(producer: KafkaProducer, df: pd.DataFrame):
    try:
        while True:
            record = df.sample(1).to_dict(orient="records")[0]
            future = producer.send(TOPIC_NAME, value=record)
            future.get(timeout=10)
            print(f"Sent record: {record}")
            time.sleep(SLEEP_INTERVAL)
    except KeyboardInterrupt:
        print("Stopping producer...")
    except KafkaError as e:
        print(f"Kafka error occurred: {e}")
    finally:
        producer.flush()
        producer.close()


In [None]:

def main():
    producer = create_producer(KAFKA_BROKER)
    df = load_data(CSV_PATH)
    send_random_records(producer, df)


In [None]:

if __name__ == "__main__":
    main()
