In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Window Operations and Watermarks") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    .config("spark.sql.shuffle.partitions", 8)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [2]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "ed-kafka:29092")
    .option("subscribe", "wildlife")
    .option("startingOffsets", "earliest")
    .load()
)

In [3]:
# Convert binary to string value column
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [4]:
from pyspark.sql.functions import from_json, col, split, explode

# JSON Schema
json_schema = "event_time string, data string"

# Expand JSON from Value column using Schema
json_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema))

In [5]:
# Select the required columns

flattened_df = json_df.select("values_json.event_time","values_json.data")

In [6]:
# Split the data in words

words_df = flattened_df \
    .withColumn("words", split("data", " ")) \
    .withColumn("word", explode("words")) \
    .withColumn("event_time", col("event_time").cast("timestamp"))

In [7]:
words_df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- data: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- word: string (nullable = false)



In [8]:
# Aggregate the words to generate count
from pyspark.sql.functions import count, lit, window

df_agg = words_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(window("event_time", "10 minutes", "5 minutes"),
                          "word").agg(count(lit(1)).alias("cnt"))

In [9]:
df_final = df_agg.selectExpr("window.start as start_time", "window.end as end_time", "word", "cnt")

In [10]:
df_final.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- end_time: timestamp (nullable = true)
 |-- word: string (nullable = false)
 |-- cnt: long (nullable = false)



In [11]:
(df_final
 .writeStream
 .format("console")
 .outputMode("complete")
 .trigger(processingTime='30 seconds')
 .option("checkpointLocation", "checkpoint_dir_kafka_2")
 .start()
)

<pyspark.sql.streaming.StreamingQuery at 0x7f2acbcd24d0>

In [12]:
(df_final
 .writeStream
 .format("console")
 .outputMode("update")
 .trigger(processingTime='30 seconds')
 .option("checkpointLocation", "checkpoint_dir_kafka_3")
 .start()
)

<pyspark.sql.streaming.StreamingQuery at 0x7f2acbcd2b00>