In [None]:
from kafka import KafkaConsumer
import json
import pandas as pd
from datetime import datetime

# Create a Kafka consumer
consumer = KafkaConsumer(
    'stock_prices',
    bootstrap_servers='localhost:9092',
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    auto_offset_reset='earliest',  # start from earliest messages
    enable_auto_commit=True,
    group_id='stock_group'
)

# Storage list
records = []

print("Listening for new temp messages... (Ctrl+C to stop)")

df = pd.DataFrame()

try:
    for msg in consumer:
        data = msg.value
        data["timestamp"] = datetime.now().isoformat(timespec="seconds") #timestamp now, should come from producer side
        print("Received:", data)

        records.append(data)

        # Periodically update CSV every 20 messages
        if len(records) % 20 == 0:
            df = pd.concat([df, pd.DataFrame(records)], ignore_index = True)
            df.to_csv("stock_price.csv", index=False)
            print(f"Saved {len(records)} records to stock_price.csv")

except KeyboardInterrupt:
    print("\nStopped by user.")
    if records:
        df = pd.DataFrame(records)
        #df.to_csv("stock_price.csv", index=False)
        print(f"Final save: {len(records)} total records.")



Listening for new temp messages... (Ctrl+C to stop)
Received: {'ticker': 'AAPL', 'price': 272.9599914550781, 'timestamp': '2025-12-17T21:11:13'}
Received: {'ticker': 'MSFT', 'price': 478.6000061035156, 'timestamp': '2025-12-17T21:11:13'}
Received: {'ticker': 'GOOGL', 'price': 298.0199890136719, 'timestamp': '2025-12-17T21:11:13'}
Received: {'ticker': 'TSLA', 'price': 466.739990234375, 'timestamp': '2025-12-17T21:11:13'}
Received: {'ticker': 'AAPL', 'price': 273.239990234375, 'timestamp': '2025-12-17T21:11:13'}
Received: {'ticker': 'MSFT', 'price': 478.57989501953125, 'timestamp': '2025-12-17T21:11:13'}
Received: {'ticker': 'GOOGL', 'price': 297.6650085449219, 'timestamp': '2025-12-17T21:11:13'}
Received: {'ticker': 'TSLA', 'price': 467.7550964355469, 'timestamp': '2025-12-17T21:11:13'}
Received: {'ticker': 'AAPL', 'price': 273.239990234375, 'timestamp': '2025-12-17T21:11:13'}
Received: {'ticker': 'MSFT', 'price': 478.57989501953125, 'timestamp': '2025-12-17T21:11:13'}
Received: {'ticke