In [1]:
pip install kafka-python

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install pyspark




In [3]:
pip install requests




In [4]:
pip install sseclient




In [5]:
import os
import pyspark
import threading
import time
from kafka import KafkaConsumer
from kafka import KafkaProducer
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, expr
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.streaming import StreamingContext

In [6]:
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.13:{pyspark.__version__} pyspark-shell'
os.environ['SPARK_SUBMIT_OPTS'] = '-Djdk.security.auth.login.Config=ignore'

In [7]:
KAFKA_BROKER_URL = "localhost:9092"
KAFKA_TOPIC = "wikimedia_topic_1"

In [8]:
# *** Note!!! **** At this point you need to have Kafka broker running. See Setup for Docker and Kafka.

producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL)

In [9]:
import requests
from sseclient import SSEClient
URL = 'https://stream.wikimedia.org/v2/stream/recentchange'
headers = {
    "User-Agent": "big_data",
    "Authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJhdWQiOiI0ZGRlYjY2NWM2YTAwMjJiZmIwZjA1MzA4MjU1YjI2YSIsImp0aSI6ImE5MmViMDk1ZTc1NjI4NzczMzg5YTFhNjc1YmQwMWQ2NjEwOTFhN2E1YzVjYjY0YmVlZjc3OGQyNWQ0ZmY1M2E3NTdkY2VjZjI0OTI0NGIyIiwiaWF0IjoxNzY4NjUwMDQ1LjE1ODQ3MiwibmJmIjoxNzY4NjUwMDQ1LjE1ODQ3NSwiZXhwIjozMzMyNTU1ODg0NS4xNTU3ODUsInN1YiI6IjgxOTE0Mzc4IiwiaXNzIjoiaHR0cHM6Ly9tZXRhLndpa2ltZWRpYS5vcmciLCJyYXRlbGltaXQiOnsicmVxdWVzdHNfcGVyX3VuaXQiOjUwMDAsInVuaXQiOiJIT1VSIn0sInNjb3BlcyI6WyJiYXNpYyIsImNyZWF0ZWVkaXRtb3ZlcGFnZSIsImVkaXRwcm90ZWN0ZWQiXX0.bLWOUdFqZJu2_WxXK5S5iokZxs_rhnTN8Ss64WC9ynRmcAXyS0fDvJ9TDyqOfSR9kxI_k9REXphFkqaM2r2Qe8MpzbjLRk2IVE_B9SjtlfLerch7rkY8Ust_AP1MQn4zi-qZPou_Z2gIa5cOArrxGKNW3JlWemP6Vfhu0WdvDFwO-u9VmaFshsyvBzQS6UBAS8JADLstC-t3IL5GV3GkcjMfx-okJ0TOPFLSI6j0_MXD7yWHOD2gYH2kCQcS4gO28T2vUWg_tFxX8oKvpEgMFkd3dfAWn546u-I30yIqcLw-a0a0e3Kdn3OGAZsd8Kbz1W1RJ1MWy6OgrQkFxjI0mwHrhUKbO5jL1Q51PuA8MJ_REKTWxrmqYWxzQis_xzg_WhgLPChmUbrP1_iaxJJRETcU9zAk0Fa2Y-56s5w3KbiYMSMjSs2-d32MBIqiAfkTr5hqeiU5Pr7wFXasl8IOZv9GHPfYNuVHEbD8-9KEorrBVs68aw1o5Y6Cw0FsFiV811S4oZN5keT89IegiNC_G1GZ56n-SO4XgftQyNwS7bbqvD8ZybvU8bfZeDjEiDhv2RaD5VO2aawkjLik-0ra6suSnKQkRCCFx8Z1DYsyo9FkSgMi45SEqy06DYVh3k7oTiBVCWE3Ur7sYNbPdsrDJN_I-Y9JFhOo1JNNsg8As3A"
}
def relay():
    events = SSEClient(URL, headers=headers, timeout=30)
    for i in range(100):
        for event in events:
            if event.event == 'message' and event.data != None:
                message = event.data.encode("utf-8")
                producer.send(KAFKA_TOPIC, value=message)
                break
threading.Thread(target=relay).start()


In [10]:
# *** Note!!! **** Before continue to the next phase, make sure that you have a topic and events in it.
# See 'Lookup Kafka Topics' in the Readme.

spark = SparkSession.builder \
    .appName("PySpark-jupyter-streaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:{pyspark.__version__}") \
    .config("spark.sql.streaming.checkpointLocation", "./checkpoint") \
    .getOrCreate()

In [11]:
# Original streaming code:
# kafka_df = spark.readStream \
#   .format("kafka") \
#   .option("kafka.bootstrap.servers", KAFKA_BROKER_URL) \
#   .option("subscribe", KAFKA_TOPIC) \
#   .option("startingOffsets", "earliest") \
#   .load()

# Batch mode (works on Windows):
kafka_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER_URL) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

print(f"Total messages in Kafka topic: {kafka_df.count()}")

Total messages in Kafka topic: 900


In [12]:
schema = StructType() \
    .add("id", IntegerType()) \
    .add("type", StringType()) \
    .add("comment", StringType()) \
    .add("user", StringType()) \
    .add("title", StringType()) \
    .add("server_name", StringType())

# Transform data to dataframe of json format
parsed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

print(f"Parsed {parsed_df.count()} records")
parsed_df.show(10, truncate=False)

Parsed 900 records
+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+-----------------------------------------------------------+---------------------+
|id        |type      |comment                                                                                                                                                                |user            |title                                                      |server_name          |
+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+-----------------------------------------------------------+---------------------+
|NULL      |NULL      |NULL                                                                                 

In [13]:
# Original streaming code:
# parsed_df.writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .start()

# Batch mode - create temp view and show data:
parsed_df.createOrReplaceTempView("wikimedia_changes")
print("Sample data:")
spark.sql("SELECT * FROM wikimedia_changes LIMIT 10").show(truncate=False)

Sample data:
+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+-----------------------------------------------------------+---------------------+
|id        |type      |comment                                                                                                                                                                |user            |title                                                      |server_name          |
+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+-----------------------------------------------------------+---------------------+
|NULL      |NULL      |NULL                                                                                       

In [14]:
# Original streaming code:
# parsed_df.createOrReplaceTempView("parsed_df")
# spark.sql("select user, count(*) as count from parsed_df group by user") \
# .writeStream \
# .outputMode("complete") \
# .format("console") \
# .start()

# Batch mode:
print("Edits by user:")
spark.sql("""
    SELECT user, COUNT(*) as count 
    FROM wikimedia_changes 
    GROUP BY user 
    ORDER BY count DESC
    LIMIT 10
""").show(truncate=False)

Edits by user:
+------------------+-----+
|user              |count|
+------------------+-----+
|PantheraLeo1359531|134  |
|CheWikibot        |92   |
|DPLA bot          |70   |
|Rkieferbot        |48   |
|Emijrpbot         |48   |
|ROCKY             |28   |
|Brunaldo Bruno    |26   |
|Ladsgroup         |26   |
|Shakko            |14   |
|GrandDukeMarcelo  |14   |
+------------------+-----+



In [15]:
# Original streaming code:
# spark.sql("select type, count(*) as count from parsed_df group by type") \
# .writeStream \
# .outputMode("complete") \
# .format("console") \
# .start()

# Batch mode:
print("Edits by type:")
spark.sql("""
    SELECT type, COUNT(*) as count 
    FROM wikimedia_changes 
    GROUP BY type 
    ORDER BY count DESC
""").show(truncate=False)

Edits by type:
+----------+-----+
|type      |count|
+----------+-----+
|categorize|465  |
|edit      |330  |
|log       |68   |
|new       |25   |
|NULL      |12   |
+----------+-----+

