In [None]:
# Import the pandas library as pd for data manipulation and analysis, particularly for working with tabular data and time series
import pandas as pd

# Import sleep from the time module to enable pausing the execution of the script for a specified amount of time
from time import sleep

# Import KafkaProducer from the kafka library to connect to Kafka and send messages to a Kafka topic
from kafka import KafkaProducer

# Import dumps from the json module to serialize Python objects into JSON formatted strings
from json import dumps

# Import the json module for working with JSON data, providing capabilities to encode and decode JSON
import json

In [None]:
# Initialize a KafkaProducer to send messages to a Kafka topic
# The KafkaProducer connects to Kafka brokers using the specified EC2 instance public IP and port
producer = KafkaProducer(
    bootstrap_servers=['Put your EC2 instance public IP here:9092'],  # List of Kafka broker addresses where the producer will connect to send messages
    value_serializer=lambda x: dumps(x).encode('utf-8')  # Serialize message values to JSON formatted strings and encode them in UTF-8 before sending
)


In [None]:

# Load the dataset from a CSV file into a pandas DataFrame
# The 'stock_data.csv' file is read, and its contents are converted into a DataFrame 'df'
# This DataFrame 'df' will hold the stock market data for further processing and analysis
df = pd.read_csv("stock_data.csv")


In [None]:
df.head()

In [None]:
import time  # Ensure this import is at the top of your file

# Assuming 'producer' is already defined and connected to Kafka
# Assuming 'df' is your DataFrame containing stock data

while True:
    try:
        # Sample a single record from the DataFrame and convert it to a dictionary
        dict_stock = df.sample(1).to_dict(orient="records")[0]
        
        # Send the dictionary as a message to the 'demo_test' topic in Kafka
        producer.send('dict_mess', value=dict_stock)
        
        # Print the sent message for logging/debugging purposes (optional)
        print(f"Sent: {dict_stock}")
        
        # Sleep for 2 seconds to rate limit the message sending
        time.sleep(2)
    except KeyboardInterrupt:
        # Gracefully exit the loop if the script is stopped with Ctrl+C
        print("Stopping the producer...")
        break
    except Exception as e:
        # Handle other exceptions to avoid crashing the script
        print(f"An error occurred: {e}")
        # Decide on whether to break or continue based on the error
        break  # or 'continue' based on your requirement


In [None]:
# Ensure all messages are sent and delivered by the KafkaProducer
# The flush() method blocks until all pending messages in the buffer are successfully delivered to the Kafka broker
producer.flush()