# Big Data Lab — Modules 1–3 Starter Notebook

This notebook walks through a lightweight, local starter for:
- Kafka topic creation, producing and consuming test messages (JSON path)
- Optional notes for Schema Registry / Avro
- A PySpark Structured Streaming job that reads from Kafka, performs an event-time aggregation with watermarking, and writes Parquet output with checkpointing.

Assumptions:
- You're running the lab stack from the lab docker-compose (kafka at `kafka:9092`, schema-registry at `http://schema-registry:8081`, MinIO at `minio:9000` if used).
- This notebook runs in the `jupyter/pyspark-notebook` container and can execute PySpark and pip installs.

Run cells sequentially.

In [ ]:
###########################
# Install Python dependencies
###########################
# The container may already have some packages; install confluent-kafka for admin/producer/consumer clients.
import sys
import subprocess
import pkgutil

def pip_install(packages):
    cmd = [sys.executable, '-m', 'pip', 'install', '--quiet'] + packages
    subprocess.check_call(cmd)

required = []
if not pkgutil.find_loader('confluent_kafka'):
    required.append('confluent-kafka')
if not pkgutil.find_loader('fastavro'):
    required.append('fastavro')
if required:
    print('Installing:', required)
    pip_install(required)
else:
    print('Required packages already installed.')


In [ ]:
###########################
# Kafka: Admin check and topic creation
###########################
from confluent_kafka.admin import AdminClient, NewTopic
import time

KAFKA_BOOTSTRAP = 'kafka:9092'
admin_conf = {'bootstrap.servers': KAFKA_BOOTSTRAP}
admin = AdminClient(admin_conf)

print('Listing existing topics (timeout=5s):')
md = admin.list_topics(timeout=5)
print(', '.join(sorted(list(md.topics.keys()))))

topic_name = 'events'
num_partitions = 6
replication_factor = 1

if topic_name not in md.topics:
    print(f"Creating topic '{topic_name}'...")
    new_topic = NewTopic(topic=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
    fs = admin.create_topics([new_topic])
    # Wait for creation
    try:
        fs[topic_name].result(10)
        print('Topic created')
    except Exception as e:
        print('Topic creation failed or already exists:', e)
else:
    print(f"Topic '{topic_name}' already exists")


In [ ]:
###########################
# Produce sample JSON messages to Kafka topic
###########################
from confluent_kafka import Producer
import json
import random
from datetime import datetime, timedelta

pconf = {'bootstrap.servers': KAFKA_BOOTSTRAP}
producer = Producer(pconf)
def delivery_report(err, msg):
    if err is not None:
        print('Delivery failed:', err)
    # else: success (we keep quiet)

print('Producing 20 test messages...')
now = datetime.utcnow()
for i in range(20):
    ev = {
        'id': str(i),
        'user_id': f'user_{random.randint(1,5)}',
        'event_type': random.choice(['click','view','purchase']),
        'event_time': (now - timedelta(seconds=random.randint(0,600))).isoformat() + 'Z',
        'value': random.random()
    }
    producer.produce(topic_name, key=ev['user_id'], value=json.dumps(ev), callback=delivery_report)
    # poll to trigger delivery callbacks
    producer.poll(0)

producer.flush(10)
print('Done producing messages.')


In [ ]:
###########################
# Consume messages (quick check)
###########################
from confluent_kafka import Consumer, KafkaException

cconf = {
    'bootstrap.servers': KAFKA_BOOTSTRAP,
    'group.id': 'notebook-checker-' + str(random.randint(0,1000)),
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(cconf)
consumer.subscribe([topic_name])
print('Polling up to 10 messages from topic...')
count = 0
try:
    while count < 10:
        msg = consumer.poll(timeout=2.0)
        if msg is None:
            break
        if msg.error():
            raise KafkaException(msg.error())
        print('Key:', msg.key().decode('utf-8') if msg.key() else None, 'Value:', msg.value().decode('utf-8'))
        count += 1
finally:
    consumer.close()


## PySpark Structured Streaming: read from Kafka, aggregate, write Parquet

This cell creates a streaming query that:
- Reads from Kafka (topic `events`)
- Parses JSON payloads
- Uses event-time (`event_time`) with watermarking
- Aggregates counts per event_type in 5 minute tumbling windows
        
By default it writes Parquet files locally under `./data/stream_output` and uses `./checkpoints/events` for checkpointing.

If you want to write to MinIO (S3-compatible), see the final notes cell for S3A configuration and required jars.


In [ ]:
###########################
# Streaming job
###########################
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StringType, TimestampType, DoubleType
import os

spark = SparkSession.builder 
    .appName('notebook-streaming-example')
    .getOrCreate()

# Define schema for JSON payload
schema = StructType() \
    .add('id', StringType()) \
    .add('user_id', StringType()) \
    .add('event_type', StringType()) \
    .add('event_time', StringType()) \
    .add('value', DoubleType())

kafka_df = (
    spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', KAFKA_BOOTSTRAP)
    .option('subscribe', topic_name)
    .option('startingOffsets', 'earliest')
    .load()
)
# value is in bytes; cast to string
json_df = kafka_df.select(from_json(col('value').cast('string'), schema).alias('data')).select('data.*')

# Cast event_time to timestamp type; if payload already has timezone/format, adjust accordingly
from pyspark.sql.functions import to_timestamp
events = json_df.withColumn('event_time', to_timestamp(col('event_time')))

# Windowed aggregation: 5-minute tumbling windows with 10-minute watermark for late data
agg = (
    events.withWatermark('event_time', '10 minutes')
    .groupBy(window(col('event_time'), '5 minutes'), col('event_type'))
    .count()
)

output_path = './data/stream_output'
checkpoint_path = './checkpoints/events'
os.makedirs(output_path, exist_ok=True)
os.makedirs(checkpoint_path, exist_ok=True)

query = (
    agg.writeStream
    .format('parquet')
    .outputMode('append')
    .option('path', output_path)
    .option('checkpointLocation', checkpoint_path)
    .trigger(processingTime='10 seconds')
    .start()
)
print('Streaming query started. Waiting for ~30 seconds to collect data...')
query.awaitTermination(30)
print('Stopping query...')
query.stop()
print('Query stopped.')


In [ ]:
###########################
# Verify Parquet output files
###########################
import glob
files = glob.glob('./data/stream_output/*/*.parquet') + glob.glob('./data/stream_output/*.parquet')
print('Parquet files written (sample):')
for f in files[:10]:
    print('-', f)

print('\nYou can open the Parquet files using PySpark or pandas (pyarrow).')


## Notes & next steps

1. Schema Registry / Avro: if you want Avro-encoded messages with Schema Registry, register Avro subjects via the Schema Registry REST API and use a producer that encodes Avro. The notebook currently uses JSON for simplicity. I can add a full Avro+Schema-Registry example (produce Avro via confluent_kafka.avro.AvroProducer and parse Avro in Spark) if you'd like.
2. Writing to MinIO (S3-compatible): to write Parquet directly to MinIO/S3 from Spark you will need the hadoop-aws and aws-java-sdk (or compatible versions) on Spark's classpath and configure spark._jsc.hadoopConfiguration with the MinIO endpoint, access key, secret key, and path-style access. Example config snippet I can add:
   - spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint','http://minio:9000')
   - spark._jsc.hadoopConfiguration().set('fs.s3a.access.key','minioadmin')
   - spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key','minioadmin')
   - spark._jsc.hadoopConfiguration().set('fs.s3a.path.style.access','true')

3. Checkpointing: the notebook uses a local checkpoint directory. For resilience in the full lab, use a durable store (MinIO path) for checkpoints so jobs can recover across container restarts.

If you want, I will now:
- add Avro + Schema Registry cells (producer & parsing), and/or
- add MinIO S3A configuration cells and instructions to include hadoop-aws jars for Spark, or
- produce a spark-submit-ready Python script version of the streaming job.
Which would you like next?