In [None]:
from kafka import KafkaConsumer
from time import sleep
from json import loads, dumps
import json
import logging
from s3fs import S3FileSystem


In [None]:
# Define Kafka broker (update IP or use localhost for testing)
KAFKA_BROKER = "localhost:9092"  # Change this to your actual Kafka broker

try:
    consumer = KafkaConsumer(
        'demo_test',
        bootstrap_servers=[KAFKA_BROKER],
        value_deserializer=lambda x: loads(x.decode('utf-8'))
    )
    print("Kafka Consumer connected successfully!")
except Exception as e:
    print(f"Error connecting to Kafka: {e}")


In [None]:
# Configure logging
logging.basicConfig(level=logging.INFO)

# Log received messages from Kafka topic
for message in consumer:
    logging.info(f"Received: {message.value}")


In [None]:
# Initialize S3 File System
s3 = S3FileSystem()

# Define S3 bucket
BUCKET_NAME = "kafka-stock-market-tutorial-youtube-darshil"

# Check if S3 bucket exists
if not s3.exists(f"s3://{BUCKET_NAME}/"):
    print(f"Error: S3 bucket '{BUCKET_NAME}' does not exist.")
else:
    print(f"Connected to S3 bucket '{BUCKET_NAME}'.")


In [None]:
# Buffer messages before writing to S3 to avoid too many small files
buffer = []
batch_size = 10  # Adjust batch size based on expected data frequency

for count, message in enumerate(consumer):
    buffer.append(message.value)

    if len(buffer) >= batch_size:
        file_name = f"stock_market_batch_{count}.json"
        s3_path = f"s3://{BUCKET_NAME}/{file_name}"

        try:
            with s3.open(s3_path, 'w') as file:
                json.dump(buffer, file)
            logging.info(f"Saved batch to {s3_path}")
        except Exception as e:
            logging.error(f"Error writing to S3: {e}")

        buffer = []  # Reset buffer after writing
