In [1]:
import json
import csv
import io
from collections import defaultdict
from kafka import KafkaConsumer
from s3fs import S3FileSystem

Kafka Config

In [None]:
KAFKA_BROKER = 'localhost:9092'
TOPIC_NAME = 'csv_json_topic'

S3 Config

In [None]:
S3_BUCKET = 'ipl_data'
S3_PREFIX = f"s3://{S3_BUCKET}/"
BATCH_SIZE = 100  # no. of row per file before flush

In [None]:
consumer = KafkaConsumer(
    TOPIC_NAME,
    bootstrap_servers=KAFKA_BROKER,
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True
)

S3 client

In [None]:
s3 = S3FileSystem(anon=False)

In [None]:
file_buffers = defaultdict(list)
headers_written = set()

In [None]:
print("🟢 Kafka consumer with batching started...")

try:
    for message in consumer:
        value = message.value
        filename = value['filename']
        row = value['data']

        # Add row to buffer
        file_buffers[filename].append(row)

        # Flush if batch is full
        if len(file_buffers[filename]) >= BATCH_SIZE:
            s3_path = f"{S3_PREFIX}{filename}"
            buffer = io.StringIO()
            writer = csv.DictWriter(buffer, fieldnames=row.keys())

            # Write header once per file
            if filename not in headers_written:
                writer.writeheader()
                headers_written.add(filename)

            writer.writerows(file_buffers[filename])  # write batch
            with s3.open(s3_path, 'a') as s3_file:
                s3_file.write(buffer.getvalue())

            print(f"✅ Flushed {len(file_buffers[filename])} rows to {s3_path}")
            file_buffers[filename] = []  # clear buffer

except KeyboardInterrupt:
    print("\n🛑 Interrupted. Flushing remaining rows...")

final flush

In [None]:
for filename, rows in file_buffers.items():
    if rows:
        s3_path = f"{S3_PREFIX}{filename}"
        buffer = io.StringIO()
        writer = csv.DictWriter(buffer, fieldnames=rows[0].keys())

        if filename not in headers_written:
            writer.writeheader()

        writer.writerows(rows)
        with s3.open(s3_path, 'a') as s3_file:
            s3_file.write(buffer.getvalue())

        print(f"🟡 Final flush: {len(rows)} rows to {s3_path}")

print("✅ Done.")