In [9]:
import pandas as pd
from kafka import KafkaConsumer
from time import sleep
from json import dumps, loads
import json
import boto3
import os
from dotenv import load_dotenv

In [10]:
load_dotenv()

consumer = KafkaConsumer(os.getenv("KAFKA_TOPIC"),
                         bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVERS"),
                         value_deserializer=lambda x: loads(x.decode('utf-8')))

In [14]:
s3 = boto3.client(
    's3',
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)

In [15]:
chunk_size=100
current_chunk=[]
chunk_counter=0

In [17]:
for message in consumer:
    if message.value is None:
        print("Received empty message, skipping")
        continue

    current_chunk.append(message.value)

    if len(current_chunk) == chunk_size:
        print("Received message:", message.value)
        chunk_counter +=1
        s3_file = f"stock-data/chunk_{chunk_counter}.json"

        try:
            s3.put_object(
                Bucket=os.getenv("AWS_S3_BUCKET"),
                Key=s3_file,
                Body=json.dumps(current_chunk)
            )

            print(f"Uploaded chunk {chunk_counter} : {len(current_chunk)} records to S3")
            current_chunk = []
        except Exception as e:
            print(f"Failed to upload chunk {chunk_counter}: {e}")

        # consumer.commit()

if current_chunk:
    chunk_counter +=1
    s3_file = f"stock-data/chunk_{chunk_counter}.json"
    s3.put_object(
            Bucket=os.getenv("AWS_S3_BUCKET"),
            Key=s3_file,
            Body=json.dumps(current_chunk)
            )
    print(f"Uploaded final chunk {chunk_counter} : {len(current_chunk)} records to S3")
    current_chunk = []

Received message: {'Index': 'IXIC', 'Date': '1979-07-23', 'Open': 137.570007, 'High': 137.570007, 'Low': 137.570007, 'Close': 137.570007, 'Adj Close': 137.570007, 'Volume': 0.0, 'CloseUSD': 137.570007}
Uploaded chunk 2 : 100 records to S3
Received message: {'Index': 'IXIC', 'Date': '1979-12-12', 'Open': 148.509995, 'High': 148.509995, 'Low': 148.509995, 'Close': 148.509995, 'Adj Close': 148.509995, 'Volume': 0.0, 'CloseUSD': 148.509995}
Uploaded chunk 3 : 100 records to S3
Received message: {'Index': 'IXIC', 'Date': '1980-05-06', 'Open': 142.229996, 'High': 142.229996, 'Low': 142.229996, 'Close': 142.229996, 'Adj Close': 142.229996, 'Volume': 0.0, 'CloseUSD': 142.229996}
Uploaded chunk 4 : 100 records to S3
Received message: {'Index': 'IXIC', 'Date': '1980-09-26', 'Open': 190.770004, 'High': 190.770004, 'Low': 190.770004, 'Close': 190.770004, 'Adj Close': 190.770004, 'Volume': 0.0, 'CloseUSD': 190.770004}
Uploaded chunk 5 : 100 records to S3
Received message: {'Index': 'IXIC', 'Date': 

KeyboardInterrupt: 