# Kafka Consumer — Stock Market Data Stream

This notebook acts as a **Kafka consumer**: it subscribes to the topic `stock-market-index`, reads JSON messages produced by the producer notebook (or any compatible producer), and prints them. Optionally, it can persist each message to an S3 bucket as a JSON file for later use with AWS Glue and Athena. Start the producer notebook first (or have it already streaming), then run this consumer so both use the same broker and topic.

In [None]:
%pip install kafka-python s3fs

In [None]:
from kafka import KafkaConsumer # type: ignore
from time import sleep
from json import dumps,loads
import json
from s3fs import S3FileSystem # type: ignore

In [None]:
# Configuration — change these to match your environment
BOOTSTRAP_SERVERS = ['localhost:9092']  # e.g. ['<EC2_IP>:9092'] for remote broker
TOPIC_NAME = 'stock-market-index'
# Optional: for S3 persistence
AWS_REGION = 'us-east-1'
S3_ACCESS_KEY = '<YOUR_ACCESS_KEY>'
S3_SECRET_KEY = '<YOUR_SECRET_KEY>'
S3_BUCKET = '<YOUR_BUCKET_NAME>'

In [None]:
consumer = KafkaConsumer(
    TOPIC_NAME,
    bootstrap_servers=BOOTSTRAP_SERVERS,
    value_deserializer=lambda x: loads(x.decode('utf-8'))
)

In [None]:
for c in consumer:
  print(c.value)

In [None]:
# Configure with your AWS credentials 
s3 = S3FileSystem(
    key=S3_ACCESS_KEY,
    secret=S3_SECRET_KEY,
    client_kwargs={'region_name': AWS_REGION}
)

In [None]:
# List bucket contents to verify S3 connectivity
print(s3.ls(S3_BUCKET))

In [None]:
for count, i in enumerate(consumer):
  print (count)
  print (i.value)

In [None]:
# Persist each consumed message to S3 as JSON
for count, i in enumerate(consumer):
    with s3.open("s3://{}/stock_market_{}.json".format(S3_BUCKET, count), 'w') as file:
        json.dump(i.value, file)