In [1]:
import json
import time
import requests
from kafka import KafkaProducer

STREAM_URL = "https://stream.wikimedia.org/v2/stream/recentchange"
TOPIC = "wikimedia_recentchange"

producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

resp = requests.get(STREAM_URL, stream=True, headers={"User-Agent": "spark-streaming-training/1.0"})
resp.raise_for_status()

count = 0
for line in resp.iter_lines():
    if not line:
        continue


    if line.startswith(b"data: "):
        data = line[len(b"data: "):]
        try:
            event = json.loads(data.decode("utf-8"))
            producer.send(TOPIC, event)
            count += 1

            if count % 20 == 0:
                print("sent", count, "events")

            if count >= 200:
                break
        except Exception:
            pass

producer.flush()
producer.close()
print("DONE. sent total:", count)


sent 20 events
sent 40 events
sent 60 events
sent 80 events
sent 100 events
sent 120 events
sent 140 events
sent 160 events
sent 180 events
sent 200 events
DONE. sent total: 200


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WikimediaKafkaToConsole").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/18 13:16:00 WARN Utils: Your hostname, Izabellas-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.166.165 instead (on interface en0)
26/01/18 13:16:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/izabellasvavolya/.ivy2.5.2/cache
The jars for the packages stored in: /Users/izabellasvavolya/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
org.apache.spark#spark-token-provider-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3545ba84-4b6d-40a9-a7f1-6e0950d7d021;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.1.1 in central
	found org.apache.spark#spark-token

In [3]:
from pyspark.sql.functions import col

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "wikimedia_recentchange") \
    .option("startingOffsets", "earliest") \
    .load()

out = df.select(col("value").cast("string").alias("value"))

q = out.writeStream \
    .format("console") \
    .outputMode("append") \
    .option("truncate", "false") \
    .start()


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [4]:
from pyspark.sql.functions import get_json_object, count, desc

parsed = out.select(
    get_json_object(col("value"), "$.wiki").alias("wiki"),
    get_json_object(col("value"), "$.type").alias("type")
)

by_wiki = parsed.groupBy("wiki").agg(count("*").alias("cnt")).orderBy(desc("cnt"))

q2 = by_wiki.writeStream \
    .format("console") \
    .outputMode("complete") \
    .option("truncate", "false") \
    .start()


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------+---+
|wiki        |cnt|
+------------+---+
|commonswiki |149|
|wikidatawiki|12 |
|frwiki      |6  |
|viwiki      |5  |
|ruwiki      |4  |
|cawikisource|3  |
|brwikisource|3  |
|metawiki    |3  |
|hawiki      |2  |
|arwiki      |2  |
|hewiki      |2  |
|enwiki      |1  |
|sqwiki      |1  |
|itwiki      |1  |
|tawiki      |1  |
|dewiki      |1  |
|eswiki      |1  |
|afwiki      |1  |
|kowiki      |1  |
|rowiki      |1  |
+------------+---+



In [5]:
q.stop()
q2.stop()
