In [4]:
# Install required packages
!pip uninstall kafka-python -y
!pip install kafka-python-ng pandas

Collecting pandas
  Downloading pandas-2.3.3-cp312-cp312-win_amd64.whl.metadata (19 kB)
Collecting numpy>=1.26.0 (from pandas)
  Downloading numpy-2.3.4-cp312-cp312-win_amd64.whl.metadata (60 kB)
Collecting pytz>=2020.1 (from pandas)
  Downloading pytz-2025.2-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Downloading tzdata-2025.2-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pandas-2.3.3-cp312-cp312-win_amd64.whl (11.0 MB)
   ---------------------------------------- 0.0/11.0 MB ? eta -:--:--
   -------- ------------------------------- 2.4/11.0 MB 16.8 MB/s eta 0:00:01
   --------------------------- ------------ 7.6/11.0 MB 21.4 MB/s eta 0:00:01
   ---------------------------------------- 11.0/11.0 MB 22.2 MB/s eta 0:00:00
Downloading numpy-2.3.4-cp312-cp312-win_amd64.whl (12.8 MB)
   ---------------------------------------- 0.0/12.8 MB ? eta -:--:--
   ------------------ --------------------- 6.0/12.8 MB 28.4 MB/s eta 0:00:01
   ------------------


[notice] A new release of pip is available: 24.2 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [5]:
from time import sleep
from json import dumps
from kafka import KafkaProducer
import pandas as pd

topic_name = 'bigdata'
kafka_server = 'localhost:9092'

print(f"Connecting to Kafka at {kafka_server}...")
producer = KafkaProducer(
    bootstrap_servers=kafka_server,
    value_serializer=lambda x: dumps(x).encode('utf-8')
)
print("✓ Connected to Kafka successfully!")

file_path = '../dataset/weather_dataset.csv'
print(f"\nReading data from: {file_path}")
csv_data = pd.read_csv(file_path)
print(f"✓ Loaded {len(csv_data)} records")

print("\n" + "="*60)
print("Starting to send data to Kafka...")
print("="*60)

for index, row in csv_data.iterrows():
    data = row.to_dict()
    
    for key, value in data.items():
        if not isinstance(value, str):
            data[key] = str(value)

    # Send the data as a Kafka message
    producer.send(topic_name, value=data)
    
    # Print progress every 10 records
    if (index + 1) % 10 == 0 or index == 0:
        print(f"[{index + 1}/{len(csv_data)}] Sent: Date={data.get('date_time', 'N/A')}, "
              f"Weather={data.get('weather', 'N/A')}, Temp={data.get('tempC', 'N/A')}°C")
    
    sleep(5)  # Wait 5 seconds between messages

producer.close()
print("\n" + "="*60)
print("✓ All data sent successfully!")
print("="*60)

Connecting to Kafka at localhost:9092...
✓ Connected to Kafka successfully!

Reading data from: ../dataset/weather_dataset.csv
✓ Loaded 135816 records

Starting to send data to Kafka...
[1/135816] Sent: Date=2008-07-01 00:00:00, Weather=N/A, Temp=25°C
✓ Loaded 135816 records

Starting to send data to Kafka...
[1/135816] Sent: Date=2008-07-01 00:00:00, Weather=N/A, Temp=25°C
[10/135816] Sent: Date=2008-07-01 09:00:00, Weather=N/A, Temp=28°C
[10/135816] Sent: Date=2008-07-01 09:00:00, Weather=N/A, Temp=28°C
[20/135816] Sent: Date=2008-07-01 19:00:00, Weather=N/A, Temp=27°C
[20/135816] Sent: Date=2008-07-01 19:00:00, Weather=N/A, Temp=27°C
[30/135816] Sent: Date=2008-07-02 05:00:00, Weather=N/A, Temp=24°C
[30/135816] Sent: Date=2008-07-02 05:00:00, Weather=N/A, Temp=24°C
[40/135816] Sent: Date=2008-07-02 15:00:00, Weather=N/A, Temp=29°C
[40/135816] Sent: Date=2008-07-02 15:00:00, Weather=N/A, Temp=29°C
[50/135816] Sent: Date=2008-07-03 01:00:00, Weather=N/A, Temp=25°C
[50/135816] Sent: Da

KeyboardInterrupt: 