In [None]:
BOOTSTRAP = "localhost:9092"     # consumer runs on the EC2 broker box
TOPIC     = "stocks_demo"

S3_BUCKET = "kafka-store-101231"
S3_PREFIX = "kafka_json"         # S3 folder
AWS_REGION = "eu-west-2"

BATCH_MAX  = 200                 # flush every 200 msgs...
FLUSH_SECS = 5                   # ...or every 5 seconds, whichever comes first

In [None]:
import time, json, datetime
from kafka import KafkaConsumer
import boto3

# S3 client (region-aware)
session = boto3.Session(region_name=AWS_REGION)
s3 = session.client("s3")

# Tail new messages from the live ticker
consumer = KafkaConsumer(
    TOPIC,
    bootstrap_servers=[BOOTSTRAP],
    group_id="live-s3",                # stable group id for this sink
    auto_offset_reset="latest",        # start at end if no commits
    enable_auto_commit=False,          # we'll commit after a successful S3 write
    value_deserializer=lambda v: json.loads(v.decode("utf-8"))
)

batch = []
last_flush = time.monotonic()

def flush():
    """Write current batch to S3 as JSONL and commit offsets."""
    global batch, last_flush
    if not batch:
        last_flush = time.monotonic()
        return
    ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    key = f"{S3_PREFIX}/dt={ts[:8]}/hour={ts[9:11]}/batch-{ts}-{len(batch)}.json"
    body = "\n".join(json.dumps(r, ensure_ascii=False) for r in batch).encode("utf-8")
    s3.put_object(Bucket=S3_BUCKET, Key=key, Body=body)
    consumer.commit()  # commit only after the upload succeeds
    print(f"⬆️  Uploaded {len(batch)} → s3://{S3_BUCKET}/{key}")
    batch.clear()
    last_flush = time.monotonic()

print("✅ Tailing Kafka and writing micro-batches to S3…  (Ctrl+C to stop)")
try:
    for msg in consumer:
        batch.append(msg.value)
        if len(batch) >= BATCH_MAX or (time.monotonic() - last_flush) >= FLUSH_SECS:
            flush()
except KeyboardInterrupt:
    flush()
    print("🛑 Stopped.")