In [None]:
pip install kafka-python

In [56]:
import os
import pandas as pd
from kafka import KafkaProducer
from json import dumps
from time import time
from dotenv import load_dotenv

In [57]:
# Load environment variables from .env file
load_dotenv()
KAFKA_SERVER = os.getenv('KAFKA_SERVER')
KAFKA_PORT = os.getenv('KAFKA_PORT')

In [58]:
# Initialize Kafka producer with optimized settings
producer = KafkaProducer(
    bootstrap_servers=[f"{KAFKA_SERVER}:{KAFKA_PORT}"], 
    value_serializer=lambda x: dumps(x).encode('utf-8'),
    batch_size=65536,  # Batch size of 64 KB
    linger_ms=20,  # 20 ms delay to optimize batching
    compression_type='lz4'  # Use LZ4 compression for faster processing
)

In [59]:
# Load CSV data
df = pd.read_csv("data/indexProcessed.csv")

In [60]:
# Number of messages to send
num_messages = 5000

In [61]:
# Record the start time
start_time = time()

In [62]:
# Send messages to Kafka topic
for _ in range(num_messages):
    dict_stock = df.sample(1).to_dict(orient="records")[0]
    producer.send('demo_test', value=dict_stock)

In [63]:
# Ensure all messages are sent before measuring the end time
producer.flush()

In [64]:
# Record the end time
end_time = time()

In [65]:
# Calculate and display the total time and average throughput
total_time = end_time - start_time
print(f"Sent {num_messages} messages in {total_time:.2f} seconds")
print(f"Average throughput: {num_messages / total_time:.2f} messages/second")

Sent 5000 messages in 18.27 seconds
Average throughput: 273.69 messages/second
