## Importing necessary libraries

- KafkaConsumer: A Kafka consumer client for subscribing to and consuming messages from Kafka topics.
- sleep: A function to introduce delays in the execution.
- dumps and loads: Functions for serializing and deserializing JSON data.
- json: A library for working with JSON data.

In [1]:
pip install kafka-python
pip install s3fs

Note: you may need to restart the kernel to use updated packages.


In [1]:
import pandas as pd
from kafka import KafkaProducer
from time import sleep
from json import dumps
import json

## Creating a Kafka producer
- This creates a Kafka producer object named producer.
- The bootstrap_servers parameter specifies the Kafka broker addresses to connect to.
- The value_serializer parameter is set to serialize the message value as JSON-encoded bytes.


In [2]:
producer = KafkaProducer(bootstrap_servers=[':9092'], #change ip here
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8')) #sending json data

## Sending a single message to Kafka topic:
- This sends a single message to the Kafka topic named 'demo_testing2'.
- The message value is a dictionary with a single key-value pair.

In [8]:
producer.send('demo_testing2', value={'key':'value'})

<kafka.producer.future.FutureRecordMetadata at 0x17ce1c4eb90>

##  Reading data from a CSV file

This reads the data from a CSV file named 'indexProcessed.csv' into a pandas DataFrame named df.

In [10]:
df = pd.read_csv("indexProcessed.csv")

In [13]:
#print first 5 rows
df.head()

Unnamed: 0,Index,Date,Open,High,Low,Close,Adj Close,Volume,CloseUSD
0,HSI,1986-12-31,2568.300049,2568.300049,2568.300049,2568.300049,2568.300049,0.0,333.879006
1,HSI,1987-01-02,2540.100098,2540.100098,2540.100098,2540.100098,2540.100098,0.0,330.213013
2,HSI,1987-01-05,2552.399902,2552.399902,2552.399902,2552.399902,2552.399902,0.0,331.811987
3,HSI,1987-01-06,2583.899902,2583.899902,2583.899902,2583.899902,2583.899902,0.0,335.906987
4,HSI,1987-01-07,2607.100098,2607.100098,2607.100098,2607.100098,2607.100098,0.0,338.923013


In [12]:
# converting a random row inta a json file to show how the data will be sent to the topic
df.sample(1).to_dict(orient="records")[0]


{'Index': '000001.SS',
 'Date': '1997-10-21',
 'Open': 1190.494995,
 'High': 1192.339966,
 'Low': 1179.573975,
 'Close': 1191.869995,
 'Adj Close': 1191.869995,
 'Volume': 0.0,
 'CloseUSD': 190.6991992}

## Generating and sending messages in a loop
- This creates an infinite loop that repeatedly sends messages to the Kafka topic.
- Inside the loop, it randomly samples a row from the DataFrame .
- The selected row is converted to a dictionary using to_dict(orient="records").
- The resulting dictionary, dict_stock, represents the message value.
- The message value is sent to the Kafka topic using producer.send().
- The loop then pauses for 2 seconds using sleep(2).

In [None]:
while True:
    dict_stock = df.sample(1).to_dict(orient="records")[0]
    producer.send('demo_testing2', value=dict_stock)
    sleep(2)

## Flushing the producer
This flushes any remaining messages in the producer's buffer and ensures they are sent to the Kafka server.


In [9]:
producer.flush() #clear data from kafka server