In [None]:
import json
import csv
import boto3
from datetime import datetime
from confluent_kafka import Consumer


KAFKA_BROKER = "3.133.95.1:9092"
KAFKA_TOPIC = "finnhub_profiles"
S3_BUCKET_NAME = "realtimestockmarketdata"

# Initialize Kafka consumer
consumer = Consumer({
    "bootstrap.servers": KAFKA_BROKER,
    "group.id": "finnhub_consumer_group",
    "auto.offset.reset": "earliest"
})
consumer.subscribe([KAFKA_TOPIC])

profiles = []

try:
    print("🟢 Waiting for messages from Kafka...")
    while True:
        msg = consumer.poll(1.0)  # timeout 1 second
        if msg is None:
            continue
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue

        data = json.loads(msg.value().decode("utf-8"))
        profiles.append(data)
        print(f"Received profile: {data['symbol']}")

        # Optional: Save every 50 messages or after all messages
        if len(profiles) >= 50:
            break
finally:
    consumer.close()

# Save to CSV
filename = f"finnhub_profiles_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(filename, mode="w", newline="", encoding="utf-8") as file:
    writer = csv.DictWriter(file, fieldnames=["symbol", "name", "exchange", "industry", "website"])
    writer.writeheader()
    writer.writerows(profiles)
print(f"✅ Saved {len(profiles)} profiles to CSV: {filename}")

# Upload to S3
try:
    s3 = boto3.client("s3")
    s3.upload_file(filename, S3_BUCKET_NAME, filename)
    print(f"✅ Uploaded CSV to S3 bucket '{S3_BUCKET_NAME}'")
except Exception as e:
    print(f"❌ Failed to upload CSV to S3: {e}")
