# Big Data — PySpark DataFrames + Kafka (confluent-kafka)
This notebook demonstrates:
- Basic **PySpark DataFrame** creation and transformations
- A simple **Kafka Producer** and **Kafka Consumer** using the `confluent-kafka` Python library
- Converting consumed Kafka messages into a Spark DataFrame

Assumptions:
- You are running inside the course Docker Compose network.
- Kafka broker is reachable at `kafka:9092` (default), or via the env var `KAFKA_BOOTSTRAP_SERVERS`.


In [1]:

# --- Config ---
import os, json, time, uuid

KAFKA_BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092")
TOPIC = os.getenv("KAFKA_TOPIC", "sid45_demo")
GROUP_ID = os.getenv("KAFKA_GROUP_ID", "sid45_demo_group")

print("KAFKA_BOOTSTRAP_SERVERS =", KAFKA_BOOTSTRAP)
print("TOPIC =", TOPIC)
print("GROUP_ID =", GROUP_ID)


KAFKA_BOOTSTRAP_SERVERS = kafka:9092
TOPIC = sid45_demo
GROUP_ID = sid45_demo_group


## 1) PySpark: create and transform DataFrames

In [2]:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

spark = (SparkSession.builder
         .appName("sid45-pyspark-kafka-demo")
         # For cluster mode (Spark standalone in Compose), uncomment:
         # .master("spark://spark-master:7077")
         # For local mode (often simpler for a quick demo), keep as default:
         .getOrCreate())

spark


In [3]:

# Create a small DataFrame
data = [
    ("u1", "click",  1.2),
    ("u2", "view",   0.4),
    ("u1", "purchase", 5.0),
    ("u3", "view",   0.2),
    ("u2", "click",  0.9),
]
schema = T.StructType([
    T.StructField("user_id", T.StringType(), False),
    T.StructField("event", T.StringType(), False),
    T.StructField("value", T.DoubleType(), False),
])

df = spark.createDataFrame(data, schema=schema)
df.show(truncate=False)
df.printSchema()


+-------+--------+-----+
|user_id|event   |value|
+-------+--------+-----+
|u1     |click   |1.2  |
|u2     |view    |0.4  |
|u1     |purchase|5.0  |
|u3     |view    |0.2  |
|u2     |click   |0.9  |
+-------+--------+-----+

root
 |-- user_id: string (nullable = false)
 |-- event: string (nullable = false)
 |-- value: double (nullable = false)



In [4]:

# Simple transformations: filter, groupBy, aggregation, and a derived column
df2 = (df
       .withColumn("event_upper", F.upper(F.col("event")))
       .filter(F.col("value") >= 0.5))

agg = (df2
       .groupBy("user_id")
       .agg(
           F.count("*").alias("n_events"),
           F.sum("value").alias("sum_value"),
           F.collect_set("event_upper").alias("event_types")
       )
       .orderBy(F.desc("sum_value")))

df2.show(truncate=False)
agg.show(truncate=False)


+-------+--------+-----+-----------+
|user_id|event   |value|event_upper|
+-------+--------+-----+-----------+
|u1     |click   |1.2  |CLICK      |
|u1     |purchase|5.0  |PURCHASE   |
|u2     |click   |0.9  |CLICK      |
+-------+--------+-----+-----------+

+-------+--------+---------+-----------------+
|user_id|n_events|sum_value|event_types      |
+-------+--------+---------+-----------------+
|u1     |2       |6.2      |[CLICK, PURCHASE]|
|u2     |1       |0.9      |[CLICK]          |
+-------+--------+---------+-----------------+



In [5]:

# Optional: write/read Parquet to demonstrate I/O
out_path = "/workspace/data/output/pyspark_demo_parquet"
(agg.coalesce(1).write.mode("overwrite").parquet(out_path))

df_read = spark.read.parquet(out_path)
df_read.show(truncate=False)

print("Wrote and reloaded Parquet at:", out_path)


+-------+--------+---------+-----------------+
|user_id|n_events|sum_value|event_types      |
+-------+--------+---------+-----------------+
|u1     |2       |6.2      |[CLICK, PURCHASE]|
|u2     |1       |0.9      |[CLICK]          |
+-------+--------+---------+-----------------+

Wrote and reloaded Parquet at: /workspace/data/output/pyspark_demo_parquet


## 2) Kafka: create topic

In [6]:

# This cell tries to create the topic if it doesn't exist.
# If Kafka is configured with auto-topic-creation, this may be unnecessary.
from confluent_kafka.admin import AdminClient, NewTopic

admin = AdminClient({"bootstrap.servers": KAFKA_BOOTSTRAP})

# Check existing topics
md = admin.list_topics(timeout=10)
existing = set(md.topics.keys())
print("Existing topics (sample):", sorted(list(existing))[:10], "...")

if TOPIC in existing:
    print(f"Topic '{TOPIC}' already exists.")
else:
    print(f"Creating topic '{TOPIC}' (best-effort)...")
    new_topic = NewTopic(topic=TOPIC, num_partitions=1, replication_factor=1)
    fs = admin.create_topics([new_topic], request_timeout=15)
    for t, f in fs.items():
        try:
            f.result()
            print(f"Created topic: {t}")
        except Exception as e:
            print(f"Topic creation result for {t}: {e}")


Existing topics (sample): [] ...
Creating topic 'sid45_demo' (best-effort)...
Created topic: sid45_demo


## 3) Kafka Producer
We will produce a handful of JSON messages into the topic.

In [7]:

from confluent_kafka import Producer

producer = Producer({
    "bootstrap.servers": KAFKA_BOOTSTRAP,
    "retries": 3,
    "linger.ms": 50,
})

delivered = {"count": 0}

def delivery_report(err, msg):
    if err is not None:
        print("Delivery failed:", err)
    else:
        delivered["count"] += 1
        print(f"Delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}")

batch_id = str(uuid.uuid4())[:8]
messages = [
    {"batch": batch_id, "event_id": 1, "user_id": "u1", "event": "view", "value": 0.3, "ts": time.time()},
    {"batch": batch_id, "event_id": 2, "user_id": "u2", "event": "click", "value": 1.0, "ts": time.time()},
    {"batch": batch_id, "event_id": 3, "user_id": "u1", "event": "purchase", "value": 5.5, "ts": time.time()},
    {"batch": batch_id, "event_id": 4, "user_id": "u3", "event": "view", "value": 0.1, "ts": time.time()},
    {"batch": batch_id, "event_id": 5, "user_id": "u2", "event": "click", "value": 0.7, "ts": time.time()},
]

for m in messages:
    key = f"{m['batch']}-{m['event_id']}".encode("utf-8")
    value = json.dumps(m).encode("utf-8")
    producer.produce(TOPIC, key=key, value=value, on_delivery=delivery_report)
    producer.poll(0)

producer.flush(10)
print("Delivered messages:", delivered["count"])
print("Batch ID:", batch_id)


Delivered to sid45_demo [0] @ offset 0
Delivered to sid45_demo [0] @ offset 1
Delivered to sid45_demo [0] @ offset 2
Delivered to sid45_demo [0] @ offset 3
Delivered to sid45_demo [0] @ offset 4
Delivered messages: 5
Batch ID: 8fe2db1f


## 4) Kafka Consumer 
We will consume messages for a short time window and decode JSON.

In [8]:

from confluent_kafka import Consumer, KafkaException

consumer = Consumer({
    "bootstrap.servers": KAFKA_BOOTSTRAP,
    "group.id": GROUP_ID,
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
})

consumer.subscribe([TOPIC])

received = []
start = time.time()
timeout_s = 8  # keep short to avoid hanging

try:
    while time.time() - start < timeout_s and len(received) < 20:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())

        key = msg.key().decode("utf-8") if msg.key() else None
        val = msg.value().decode("utf-8") if msg.value() else None
        try:
            obj = json.loads(val) if val else None
        except json.JSONDecodeError:
            obj = {"raw": val}

        received.append({
            "topic": msg.topic(),
            "partition": msg.partition(),
            "offset": msg.offset(),
            "key": key,
            "value": obj
        })
finally:
    consumer.close()

print(f"Consumed {len(received)} messages (within ~{timeout_s}s).")
received[:3]


Consumed 5 messages (within ~8s).


[{'topic': 'sid45_demo',
  'partition': 0,
  'offset': 0,
  'key': '8fe2db1f-1',
  'value': {'batch': '8fe2db1f',
   'event_id': 1,
   'user_id': 'u1',
   'event': 'view',
   'value': 0.3,
   'ts': 1771719623.057138}},
 {'topic': 'sid45_demo',
  'partition': 0,
  'offset': 1,
  'key': '8fe2db1f-2',
  'value': {'batch': '8fe2db1f',
   'event_id': 2,
   'user_id': 'u2',
   'event': 'click',
   'value': 1.0,
   'ts': 1771719623.0571396}},
 {'topic': 'sid45_demo',
  'partition': 0,
  'offset': 2,
  'key': '8fe2db1f-3',
  'value': {'batch': '8fe2db1f',
   'event_id': 3,
   'user_id': 'u1',
   'event': 'purchase',
   'value': 5.5,
   'ts': 1771719623.05714}}]

## 5) Convert consumed messages into a Spark DataFrame

In [9]:

from pyspark.sql import functions as F

rows = []
for r in received:
    v = r.get("value") or {}
    if isinstance(v, dict):
        rows.append({
            "topic": r["topic"],
            "partition": r["partition"],
            "offset": r["offset"],
            "key": r["key"],
            "batch": v.get("batch"),
            "event_id": v.get("event_id"),
            "user_id": v.get("user_id"),
            "event": v.get("event"),
            "value": v.get("value"),
            "ts": v.get("ts"),
        })
    else:
        rows.append({
            "topic": r["topic"],
            "partition": r["partition"],
            "offset": r["offset"],
            "key": r["key"],
            "batch": None,
            "event_id": None,
            "user_id": None,
            "event": None,
            "value": None,
            "ts": None,
        })

df_kafka = spark.createDataFrame(rows)
df_kafka.show(truncate=False)


+--------+--------+--------+----------+------+---------+----------+--------------------+-------+-----+
|batch   |event   |event_id|key       |offset|partition|topic     |ts                  |user_id|value|
+--------+--------+--------+----------+------+---------+----------+--------------------+-------+-----+
|8fe2db1f|view    |1       |8fe2db1f-1|0     |0        |sid45_demo|1.771719623057138E9 |u1     |0.3  |
|8fe2db1f|click   |2       |8fe2db1f-2|1     |0        |sid45_demo|1.7717196230571396E9|u2     |1.0  |
|8fe2db1f|purchase|3       |8fe2db1f-3|2     |0        |sid45_demo|1.77171962305714E9  |u1     |5.5  |
|8fe2db1f|view    |4       |8fe2db1f-4|3     |0        |sid45_demo|1.7717196230571408E9|u3     |0.1  |
|8fe2db1f|click   |5       |8fe2db1f-5|4     |0        |sid45_demo|1.771719623057141E9 |u2     |0.7  |
+--------+--------+--------+----------+------+---------+----------+--------------------+-------+-----+



In [10]:

# A small Spark analysis on consumed events
if df_kafka.count() > 0:
    df_kafka.groupBy("user_id").agg(
        F.count("*").alias("n_msgs"),
        F.sum("value").alias("sum_value")
    ).orderBy(F.desc("sum_value")).show(truncate=False)
else:
    print("No messages consumed — check Kafka service / topic / networking.")


+-------+------+---------+
|user_id|n_msgs|sum_value|
+-------+------+---------+
|u1     |2     |5.8      |
|u2     |2     |1.7      |
|u3     |1     |0.1      |
+-------+------+---------+

