# Structured Streaming with Apache Kafka

## Example 1

Reading a Kafka topic in AWS.
Before executing this code, replace `kafka:9094` by the right bootstrap server

In [0]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "34.139.223.68:9094") \
  .option("subscribe", "toots") \
  .load()
  
schema = StructType(
    [
        StructField('id', StringType(), True),
        StructField('content', StringType(), True),
        StructField('created_at', StringType(), True),
        StructField('account', StringType(), True)
    ]
)
df.printSchema()

dataset = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp") \
    .withColumn("value", from_json("value", schema)) \
    .select(col('key'), col("timestamp"), col('value.*'))

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [0]:
dataset.writeStream \
 .outputMode("append") \
 .format("memory") \
 .option("truncate", "false") \
 .queryName("toots_topic") \
 .start()

Out[2]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f94b8ccc250>

In [0]:
%sql
SELECT
  *
FROM
  toots_topic

key,timestamp,id,content,created_at,account
,2024-06-21T10:53:42.197+0000,112654235862211765,A veces me paro a pensar y es fascinante que nos guste tanto la genitalia de las plantas xD,2024-06-21 10:53:41+00:00,Ransil@todon.eu
,2024-06-21T10:54:17.117+0000,112654238154815524,,2024-06-21 10:54:15+00:00,elhackernet@infosec.exchange
,2024-06-21T10:54:24.838+0000,112654238661764824,,2024-06-21 10:54:20+00:00,elhackernet@infosec.exchange


## Exercise 1

Apply a sliding window each minute, 5 minutes of duration, grouping by `server`. A server in Mastodon is the domain in account column

---



In [0]:
from pyspark.sql.functions import col, window, split, when
(
dataset
  .withColumn("server", when(col("account").contains("@"), split(col("account"), "@").getItem(1))
                   .otherwise(None))
  .groupBy(window(col("timestamp"), "5 minutes", "1 minutes"), col("server"))
  .count()
  .display()
)

window,server,count
"List(2024-06-21T10:52:00.000+0000, 2024-06-21T10:57:00.000+0000)",infosec.exchange,2
"List(2024-06-21T10:53:00.000+0000, 2024-06-21T10:58:00.000+0000)",mastodon.gamedev.place,1
"List(2024-06-21T10:53:00.000+0000, 2024-06-21T10:58:00.000+0000)",social.lansky.name,1
"List(2024-06-21T10:55:00.000+0000, 2024-06-21T11:00:00.000+0000)",mastodon.gamedev.place,1
"List(2024-06-21T10:55:00.000+0000, 2024-06-21T11:00:00.000+0000)",plush.city,1
"List(2024-06-21T10:54:00.000+0000, 2024-06-21T10:59:00.000+0000)",infosec.exchange,2
"List(2024-06-21T10:56:00.000+0000, 2024-06-21T11:01:00.000+0000)",masto.pt,1
"List(2024-06-21T10:55:00.000+0000, 2024-06-21T11:00:00.000+0000)",mastodon.art,1
"List(2024-06-21T10:57:00.000+0000, 2024-06-21T11:02:00.000+0000)",mastodon.art,1
"List(2024-06-21T10:53:00.000+0000, 2024-06-21T10:58:00.000+0000)",mastodon.art,1


In [0]:
from pyspark.sql.functions import col, window, split, when
(
dataset
  .withColumn("server", when(col("account").contains("@"), split(col("account"), "@").getItem(1))
                   .otherwise(None))
  .groupBy(window(col("timestamp"), "5 minutes", "1 minutes"), col("server"))
  .count()
  .writeStream \
  .outputMode("update") \
  .format("memory") \
  .option("truncate", "false") \
  .queryName("toots_update_topic") \
  .start()
)

<pyspark.sql.streaming.query.StreamingQuery at 0x7feedc4d5db0>

In [0]:
%sql
select * from toots_update_topic

window,server,count
"List(2024-03-10T11:27:00Z, 2024-03-10T11:32:00Z)",chaosfem.tw,1
"List(2024-03-10T11:23:00Z, 2024-03-10T11:28:00Z)",chaosfem.tw,1
"List(2024-03-10T11:26:00Z, 2024-03-10T11:31:00Z)",chaosfem.tw,1
"List(2024-03-10T11:25:00Z, 2024-03-10T11:30:00Z)",chaosfem.tw,1
"List(2024-03-10T11:24:00Z, 2024-03-10T11:29:00Z)",chaosfem.tw,1
"List(2024-03-10T11:25:00Z, 2024-03-10T11:30:00Z)",mstdn.jp,1
"List(2024-03-10T11:28:00Z, 2024-03-10T11:33:00Z)",mstdn.jp,1
"List(2024-03-10T11:25:00Z, 2024-03-10T11:30:00Z)",41020.social,1
"List(2024-03-10T11:27:00Z, 2024-03-10T11:32:00Z)",41020.social,1
"List(2024-03-10T11:28:00Z, 2024-03-10T11:33:00Z)",41020.social,1


## Exercise 2

Each minute, get the number of toots received in last 5 minutes

---



In [0]:
dataset.groupBy(window(col("timestamp"), "5 minutes", "1 minutes")) \
    .count() \
    .writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("toots_windowed_2") \
    .start()

In [0]:
%sql
SELECT
  *
FROM
  toots_windowed_2
ORDER BY
  count DESC

## Exercise 3

Get top words with more than 3 letters in 1 minute slots

---



In [0]:
from pyspark.sql.functions import lower, explode, length

dataset = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp") \
    .withColumn("value", from_json("value", schema)) \
    .select(col('key'), col("timestamp"), col('value.*')) \
    .select(explode(split(col("content"), " ")).alias("word"), "timestamp") \
    .filter(length(col("word")) > 3)

dataset.groupBy(window(col("timestamp"), "1 minutes"), col("word")) \
    .count() \
    .display()

window,word,count
"List(2024-03-10T11:46:00Z, 2024-03-10T11:47:00Z)",cabeza,1.0
"List(2024-03-10T11:44:00Z, 2024-03-10T11:45:00Z)","rel=""nofollow",5.0
"List(2024-03-10T11:46:00Z, 2024-03-10T11:47:00Z)","rel=""nofollow",1.0
"List(2024-03-10T11:44:00Z, 2024-03-10T11:45:00Z)",realiza,1.0
"List(2024-03-10T11:47:00Z, 2024-03-10T11:48:00Z)","mention""",1.0
"List(2024-03-10T11:45:00Z, 2024-03-10T11:46:00Z)","class=""invisible"">k",1.0
"List(2024-03-10T11:46:00Z, 2024-03-10T11:47:00Z)",quería,1.0
"List(2024-03-10T11:46:00Z, 2024-03-10T11:47:00Z)",como,1.0
"List(2024-03-10T11:46:00Z, 2024-03-10T11:47:00Z)","noreferrer""",1.0
"List(2024-03-10T11:46:00Z, 2024-03-10T11:47:00Z)",🤡,1.0


## Clean up DBFS

In [0]:
%scala
// Clean up
val PATH = "dbfs:/tmp/"
dbutils.fs.ls(PATH)
            .map(_.name)
            .foreach((file: String) => dbutils.fs.rm(PATH + file, true))