In [None]:
from confluent_kafka import Consumer, KafkaError
import json
from s3fs import S3FileSystem

# Kafka consumer configuration
conf = {
    'bootstrap.servers': '3.109.153.20:9092',  # Replace with your Kafka broker's IP address
    'group.id': 'my_consumer_group',  # Specify a unique group ID for your consumer
    'auto.offset.reset': 'earliest'     # Start reading from the beginning of the topic if no offset is found
}

# Create a Consumer instance
consumer = Consumer(conf)

# Subscribe to the topic
consumer.subscribe(['demo_stock_project'])

# Initialize S3 filesystem
s3 = S3FileSystem()

# Function to consume messages and upload to S3
def consume_messages_and_upload_to_s3(bucket_name):
    print("Waiting for messages...")
    count=0
    while True:  
        msg = consumer.poll(timeout=1.0)  # Poll for messages with a timeout of 1 second
        if msg is None:
            continue  # No message received, continue polling
        
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                print(f"Error occurred: {msg.error()}")
                break
        
        # Attempt to decode the message value
        try:
            message_value = json.loads(msg.value().decode('utf-8'))  # Decode and parse JSON
            
            # Create the S3 object key
            key = f"s3://stock-market-project-test-ukesh-bucket/stock_market_data/stock_market_{count}.json"  # Define your S3 object key
            
            # Upload the JSON data to S3
            with s3.open(key, 'w') as file:
                json.dump(message_value, file)  # Dump JSON data into the S3 file
            
            print(f"Uploaded message to {key}")
            count+=1
        
        except json.JSONDecodeError as e:
            print(f"Failed to decode JSON: {e} - Received value: {msg.value().decode('utf-8')}")
    
    # Close the consumer cleanly
    consumer.close()
    print("Consumer closed.")

# Run the consumer function
consume_messages_and_upload_to_s3('stock-market-project-test-ukesh-bucket')  # Replace with your S3 bucket name


Waiting for messages...
Uploaded message to s3://stock-market-project-test-ukesh-bucket/stock_market_data/stock_market_0.json
Uploaded message to s3://stock-market-project-test-ukesh-bucket/stock_market_data/stock_market_1.json
Uploaded message to s3://stock-market-project-test-ukesh-bucket/stock_market_data/stock_market_2.json
Uploaded message to s3://stock-market-project-test-ukesh-bucket/stock_market_data/stock_market_3.json
Uploaded message to s3://stock-market-project-test-ukesh-bucket/stock_market_data/stock_market_4.json
Uploaded message to s3://stock-market-project-test-ukesh-bucket/stock_market_data/stock_market_5.json
Uploaded message to s3://stock-market-project-test-ukesh-bucket/stock_market_data/stock_market_6.json
Uploaded message to s3://stock-market-project-test-ukesh-bucket/stock_market_data/stock_market_7.json
Uploaded message to s3://stock-market-project-test-ukesh-bucket/stock_market_data/stock_market_8.json
Uploaded message to s3://stock-market-project-test-ukesh-b