In [1]:
pip install kafka-python

In [2]:
pip install pyspark

In [3]:
import os
import pyspark

In [4]:
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.13:{pyspark.__version__} pyspark-shell'
os.environ['SPARK_SUBMIT_OPTS'] = '-Djdk.security.auth.login.Config=ignore'


In [5]:
KAFKA_BROKER_URL = "localhost:9092"
KAFKA_TOPIC = "wikimedia_topic_1"

In [6]:
pip install requests

In [7]:
pip install sseclient

In [8]:
import threading
import time
from kafka import KafkaConsumer
from kafka import KafkaProducer
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, expr
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.streaming import StreamingContext


In [9]:
# *** Note!!! **** At this point you need to have Kafka broker running. See Setup for Docker and Kafka.

producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL)

In [10]:
# *** Note!!! **** In order to read wikimedia changes, you need to obtain Access Token 
# See instructions in Readm about how to authenticate to Wikimedia page

In [11]:
import requests
from sseclient import SSEClient
URL = 'https://stream.wikimedia.org/v2/stream/recentchange'
headers = {
    "User-Agent": "[YOUR_APP_NAME_HERE]",
    "Authorization": "Bearer [YOUR_ACCESS_TOKEN_HERE]"
}
def relay():
    events = SSEClient(URL, headers=headers, timeout=30)
    for i in range(100):
        for event in events:
            if event.event == 'message' and event.data != None:
                message = event.data.encode("utf-8")
                producer.send(KAFKA_TOPIC, value=message)
                break
threading.Thread(target=relay).start()


In [12]:
# *** Note!!! **** Before continue to the next phase, make sure that you have a topic and events in it.
# See 'Lookup Kafka Topics' in the Readme.

In [13]:
spark = SparkSession.builder \
    .appName("PySpark-jupyter-streaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:{pyspark.__version__}") \
    .config("spark.sql.streaming.checkpointLocation", "./checkpoint") \
    .getOrCreate()

:: loading settings :: url = jar:file:/Users/ran/.venv3.11/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/ran/.ivy2.5.2/cache
The jars for the packages stored in: /Users/ran/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6e292bff-b487-4dbb-a603-6029cc7fc390;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.1.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.1.0 in central
	found org.apache.kafka#kafka-clients;3.9.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.8 in central
	found org.slf4j#slf4j-api;2.0.17 in central
	found org.apache.hadoop#hadoop-client-runtime;3.4.2 in central
	found org.apache.hadoop#hadoop-client-api;3.4.2 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org

In [14]:
kafka_df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", KAFKA_BROKER_URL) \
  .option("subscribe", KAFKA_TOPIC) \
  .option("startingOffsets", "earliest") \
  .load()

In [15]:
schema = StructType() \
    .add("id", IntegerType()) \
    .add("type", StringType()) \
    .add("comment", StringType()) \
    .add("user", StringType()) \
    .add("title", StringType()) \
    .add("server_name", StringType())

# Transform data to dataframe of json format
parsed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

In [16]:
parsed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start() 

In [17]:
parsed_df.createOrReplaceTempView("parsed_df")

spark.sql("select user, count(*) as count from parsed_df group by user") \
.writeStream \
.outputMode("complete") \
.format("console") \
.start() 

25/12/30 13:33:44 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x1093c8a90>

25/12/30 13:33:44 WARN MicroBatchExecution: Disabling AQE since AQE is not supported in stateful workloads.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------------+-----+
|             user|count|
+-----------------+-----+
|   Mario Falcetti|    4|
|          9Aaron3|    1|
|         Escarbot|    1|
|           Dexbot|    2|
|      ListeriaBot|    1|
|          RussBot|    3|
|       Fl.schmitt|   18|
|       Bot-Jagwar|    2|
|        Orokbest1|    1|
| Egon Willighagen|   12|
| MIKHAIL-TAL-GSMC|    2|
|Artslettersperson|    1|
|         大慈树王|    1|
|           روتانا|    1|
|           Acélan|    1|
|        Joseolgon|    3|
|    RadioactOlive|    1|
|Oleksandr Tahayev|    1|
|            Yxe:h|    2|
|             NULL|    3|
+-----------------+-----+
only showing top 20 rows


In [18]:
spark.sql("select type, count(*) as count from parsed_df group by type") \
.writeStream \
.outputMode("complete") \
.format("console") \
.start() 

25/12/30 13:33:53 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x1093cb910>

25/12/30 13:33:53 WARN MicroBatchExecution: Disabling AQE since AQE is not supported in stateful workloads.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+-----+
|      type|count|
+----------+-----+
|       new|   10|
|       log|   23|
|      NULL|    3|
|      edit|  168|
|categorize|   96|
+----------+-----+

