In [1]:
pip install kafka-python

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [3]:
import os
import pyspark

In [4]:
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.13:{pyspark.__version__} pyspark-shell'
os.environ['SPARK_SUBMIT_OPTS'] = '-Djdk.security.auth.login.Config=ignore'


In [5]:
KAFKA_BROKER_URL = "localhost:9092"
KAFKA_TOPIC = "wikimedia_topic_1"

In [6]:
pip install requests

Note: you may need to restart the kernel to use updated packages.


In [7]:
pip install sseclient

Note: you may need to restart the kernel to use updated packages.


In [8]:
import threading
import time
from kafka import KafkaConsumer
from kafka import KafkaProducer
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, expr
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.streaming import StreamingContext


In [9]:
# *** Note!!! **** At this point you need to have Kafka broker running. See Setup for Docker and Kafka.

producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL)

In [10]:
# *** Note!!! **** In order to read wikimedia changes, you need to obtain Access Token 
# See instructions in Readm about how to authenticate to Wikimedia page

In [11]:
import requests
from sseclient import SSEClient
URL = 'https://stream.wikimedia.org/v2/stream/recentchange'
headers = {
    "User-Agent": "[YOUR_APP_NAME_HERE]",
    "Authorization": "Bearer [YOUR_ACCESS_TOKEN_HERE]"
}
def relay():
    events = SSEClient(URL, headers=headers, timeout=30)
    for i in range(100):
        for event in events:
            if event.event == 'message' and event.data != None:
                message = event.data.encode("utf-8")
                producer.send(KAFKA_TOPIC, value=message)
                break
threading.Thread(target=relay).start()


In [12]:
# *** Note!!! **** Before continue to the next phase, make sure that you have a topic and events in it.
# See 'Lookup Kafka Topics' in the Readme.

In [13]:
spark = SparkSession.builder \
    .appName("PySpark-jupyter-streaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:{pyspark.__version__}") \
    .config("spark.sql.streaming.checkpointLocation", "./checkpoint") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/15 19:26:42 WARN Utils: Your hostname, RoeisLaptop, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/15 19:26:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/mnt/c/Users/roeiu/OneDrive/Documents/Big%20Data%20-%20Course/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2.5.2/cache
The jars for the packages stored in: /root/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c86e2b04-243b-4808-8f2c-c56506d4fc26;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.1.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.1.1 in central
	found org.apache.kafka#kafka-clie

In [14]:
kafka_df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", KAFKA_BROKER_URL) \
  .option("subscribe", KAFKA_TOPIC) \
  .option("startingOffsets", "earliest") \
  .load()

In [15]:
schema = StructType() \
    .add("id", IntegerType()) \
    .add("type", StringType()) \
    .add("comment", StringType()) \
    .add("user", StringType()) \
    .add("title", StringType()) \
    .add("server_name", StringType())

# Transform data to dataframe of json format
parsed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

In [16]:
parsed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start() 

26/01/15 19:27:02 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x79ba5a555580>

In [17]:
parsed_df.createOrReplaceTempView("parsed_df")

spark.sql("select user, count(*) as count from parsed_df group by user") \
.writeStream \
.outputMode("complete") \
.format("console") \
.start() 

26/01/15 19:27:04 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x79ba59a7e240>

In [18]:
spark.sql("select type, count(*) as count from parsed_df group by type") \
.writeStream \
.outputMode("complete") \
.format("console") \
.start() 

26/01/15 19:27:05 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
26/01/15 19:27:05 WARN MicroBatchExecution: Disabling AQE since AQE is not supported in stateful workloads.


<pyspark.sql.streaming.query.StreamingQuery at 0x79ba59a7e7b0>

In [19]:
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers="localhost:9092")
producer.send("wikimedia_topic_1", b"hello from jupyter")
producer.flush()

print("message sent")


message sent


In [20]:
import os, sys
print("Python:", sys.executable)
print("JAVA_HOME:", os.environ.get("JAVA_HOME"))

Python: /mnt/c/Users/roeiu/OneDrive/Documents/Big Data - Course/.venv/bin/python3
JAVA_HOME: /usr/lib/jvm/java-17-openjdk-amd64


In [44]:
# Stop all running streaming queries (safe reset)
for q in spark.streams.active:
    q.stop()

print("Stopped:", len(spark.streams.active))



Stopped: 0


In [19]:
spark


[Stage 2:>(21 + 16) / 200][Stage 4:> (16 + 0) / 200][Stage 5:>    (0 + 0) / 1]

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------+-----+
|          user|count|
+--------------+-----+
|    Emilio2005|    1|
|     MacTire02|    1|
|    Rathfelder|    1|
|    MathKnight|    1|
|      NotNahid|    1|
|    Dronebogus|    1|
|         Unsui|    2|
|        Geagea|    5|
|         Aju88|    1|
|      Escarbot|    1|
|    CheWikibot|   11|
|          Yann|    1|
|CoconutOctopus|    1|
|     FrankB157|    1|
|      BuldoÅ¼er|    1|
|   Vovanlocpro|    1|
|   BhagyaMohan|   21|
|   Cric editor|    1|
|     KaleemBot|    3|
|         Mellk|    1|
+--------------+-----+
only showing top 20 rows




-------------------------------------------
Batch: 1
-------------------------------------------
+----+----+-------+----+-----+-----------+
|  id|type|comment|user|title|server_name|
+----+----+-------+----+-----+-----------+
|NULL|NULL|   NULL|NULL| NULL|       NULL|
+----+----+-------+----+-----+-----------+



                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+-----+
|      type|count|
+----------+-----+
|       new|   21|
|       log|   53|
|      NULL|    9|
|      edit|  193|
|categorize|  228|
+----------+-----+



In [21]:
kafka_df.isStreaming


True

In [22]:
q_raw = (kafka_df.writeStream
         .outputMode("append")
         .format("console")
         .option("truncate", "false")
         .start())


26/01/15 19:21:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [23]:
q_raw.status


{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [24]:
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers="localhost:9092")
for i in range(5):
    producer.send("wikimedia_topic_1", f"hello {i}".encode())
producer.flush()
print("sent")


sent
-------------------------------------------
Batch: 1
-------------------------------------------
-------------------------------------------
Batch: 2
-------------------------------------------
+----+----------------------+-----------------+---------+------+-----------------------+-------------+
|key |value                 |topic            |partition|offset|timestamp              |timestampType|
+----+----------------------+-----------------+---------+------+-----------------------+-------------+
|NULL|[68 65 6C 6C 6F 20 30]|wikimedia_topic_1|0        |504   |2026-01-15 19:22:23.409|0            |
|NULL|[68 65 6C 6C 6F 20 31]|wikimedia_topic_1|0        |505   |2026-01-15 19:22:23.409|0            |
|NULL|[68 65 6C 6C 6F 20 32]|wikimedia_topic_1|0        |506   |2026-01-15 19:22:23.409|0            |
|NULL|[68 65 6C 6C 6F 20 33]|wikimedia_topic_1|0        |507   |2026-01-15 19:22:23.409|0            |
|NULL|[68 65 6C 6C 6F 20 34]|wikimedia_topic_1|0        |508   |2026-01-15 19:22

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-----+
|      type|count|
+----------+-----+
|       new|   21|
|       log|   53|
|      NULL|   14|
|      edit|  193|
|categorize|  228|
+----------+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------+-----+
|          user|count|
+--------------+-----+
|    Emilio2005|    1|
|     MacTire02|    1|
|    Rathfelder|    1|
|    MathKnight|    1|
|      NotNahid|    1|
|    Dronebogus|    1|
|         Unsui|    2|
|        Geagea|    5|
|         Aju88|    1|
|      Escarbot|    1|
|    CheWikibot|   11|
|          Yann|    1|
|CoconutOctopus|    1|
|     FrankB157|    1|
|      BuldoÅ¼er|    1|
|   Vovanlocpro|    1|
|   BhagyaMohan|   21|
|   Cric editor|    1|
|     KaleemBot|    3|
|      Malcolma|    6|
+--------------+-----+
only showing top 20 rows


In [25]:
for q in spark.streams.active:
    q.stop()

print("All streams stopped")


All streams stopped


26/01/15 19:24:37 WARN DAGScheduler: Failed to cancel job group 76649464-5705-44d2-aa0a-d2bfa0530a14. Cannot find active jobs for it.
26/01/15 19:24:37 WARN DAGScheduler: Failed to cancel job group 76649464-5705-44d2-aa0a-d2bfa0530a14. Cannot find active jobs for it.
26/01/15 19:24:37 WARN DAGScheduler: Failed to cancel job group c1bb9aa3-f002-4e3b-8e6a-076dd2cb7a66. Cannot find active jobs for it.
26/01/15 19:24:37 WARN DAGScheduler: Failed to cancel job group c1bb9aa3-f002-4e3b-8e6a-076dd2cb7a66. Cannot find active jobs for it.
26/01/15 19:24:37 WARN DAGScheduler: Failed to cancel job group 745faaaf-4637-485a-b880-e7096a8619fd. Cannot find active jobs for it.
26/01/15 19:24:37 WARN DAGScheduler: Failed to cancel job group 745faaaf-4637-485a-b880-e7096a8619fd. Cannot find active jobs for it.
26/01/15 19:24:37 WARN DAGScheduler: Failed to cancel job group 3bcd580a-0d2f-49fc-a9d6-95244f684421. Cannot find active jobs for it.
26/01/15 19:24:37 WARN DAGScheduler: Failed to cancel job grou

In [21]:
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers="localhost:9092")
producer.send("wikimedia_topic_1", b"hello from jupyter")
producer.flush()


26/01/15 19:27:05 WARN MicroBatchExecution: Disabling AQE since AQE is not supported in stateful workloads.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+----------+--------------------+-------------+--------------------+--------------------+
|        id|      type|             comment|         user|               title|         server_name|
+----------+----------+--------------------+-------------+--------------------+--------------------+
|      NULL|      NULL|                NULL|         NULL|                NULL|                NULL|
|      NULL|categorize|[[:File:Winding h...| Dave.Dunford|    Category:Wincham|commons.wikimedia...|
|    358900|categorize|[[:Pajenn:Ar Floc...|      Aveldro|     Rummad:Adlennet|   br.wikisource.org|
| 134116775|       log|Bot: Mass deletin...|    Ladsgroup|Tháº£o luáº­n ThÃ nh v...|    vi.wikipedia.org|
|      NULL|      edit|/* wbeditentity-u...|     Fabriemo|File:Musician Jul...|commons.wikimedia...|
|      NULL|      edit|Adding [[Category...|   Rkieferbot|File:Vortrag Skot...|commons.wik

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------+-----+
|          user|count|
+--------------+-----+
|    Emilio2005|    1|
|    D1mat3rrbo|    1|
|     MacTire02|    1|
|    Rathfelder|    1|
|    MathKnight|    1|
|      NotNahid|    1|
|    Dronebogus|    1|
|         Unsui|    2|
|        Geagea|    5|
|         Aju88|    1|
|      Escarbot|    2|
|    CheWikibot|   23|
|          Yann|    1|
|      Bpierreb|    8|
|      Eagle003|    1|
|     Maykwdiik|    1|
|37.193.158.132|    1|
|CoconutOctopus|    1|
|     FrankB157|    1|
|      BuldoÅ¼er|    1|
+--------------+-----+
only showing top 20 rows


                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----+----+-------+----+-----+-----------+
|  id|type|comment|user|title|server_name|
+----+----+-------+----+-----+-----------+
|NULL|NULL|   NULL|NULL| NULL|       NULL|
|NULL|NULL|   NULL|NULL| NULL|       NULL|
+----+----+-------+----+-----+-----------+

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+-----+
|      type|count|
+----------+-----+
|       new|   28|
|       log|   61|
|      NULL|   17|
|      edit|  231|
|categorize|  274|
+----------+-----+

