In [None]:
#create a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").\
                                     appName("spark_on_docker").\
                                     getOrCreate()

In [None]:
# in Python
spark.conf.set("spark.sql.shuffle.partitions", 5)

static = spark.read.json("work/TheDefinitiveGuide/Spark-The-Definitive-Guide/data/activity-data")

streaming = spark\
    .readStream\
    .schema(static.schema)\
    .option("maxFilesPerTrigger", 10)\
    .json("work/TheDefinitiveGuide/Spark-The-Definitive-Guide/data/activity-data")

streaming.printSchema()

In [None]:
withEventTime = streaming.selectExpr("*","cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")

In [None]:
from pyspark.sql.functions import window, col
withEventTime.groupBy(window(col("event_time"), "10 minutes")).count()\
.writeStream\
.queryName("pyevents_per_window")\
.format("memory")\
.outputMode("complete")\
.start()

In [None]:
spark.sql("SELECT * FROM pyevents_per_window").printSchema()

In [None]:
spark.sql("SELECT * FROM pyevents_per_window").show(5)

In [None]:
, this does apply to any window-style aggregation (or stateful
computation) we would like:

In [None]:
from pyspark.sql.functions import window, col
withEventTime.groupBy(window(col("event_time"), "10 minutes"), "User").count()\
    .writeStream\
    .queryName("pyevents_per_window2")\
    .format("memory")\
    .outputMode("complete")\
    .start()

In [None]:
spark.sql("SELECT * FROM pyevents_per_window2").printSchema()

Of importance is the fact that we can also perform an aggregation on multiple columns, including the event time column. Just like we saw in the previous chapter, we can even perform these aggregations using methods like cube. 

In [None]:
from pyspark.sql.functions import window, col
withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))\
.count()\
.writeStream\
.queryName("pyevents_per_window")\
.format("memory")\
.outputMode("complete")\
.start()

In [None]:
spark.sql("SELECT * FROM pyevents_per_window2").show(5)

+--------------------+----+------+
|              window|User| count|
+--------------------+----+------+
|{2015-02-24 12:20...|   f|133623|
|{2015-02-24 13:00...|   f| 33366|
|{2015-02-24 14:50...|   e|126282|
|{2015-02-23 14:30...|   h| 94669|
|{2015-02-24 14:10...|   e| 67577|
+--------------------+----+------+

In this example, we have 10-minute windows, starting every five minutes. 

Therefore each event will fall into two different windows. You can tweak this further according to your needs:

In [None]:
from pyspark.sql.functions import window, col
withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))\
.count()\
.writeStream\
.queryName("pyevents_per_window3")\
.format("memory")\
.outputMode("complete")\
.start()

In [None]:
spark.sql("SELECT * FROM pyevents_per_window3").printSchema()

In [None]:
spark.sql("SELECT * FROM pyevents_per_window3").show(5)

+--------------------+------+
|              window| count|
+--------------------+------+
|{2015-02-23 14:15...|107668|
|{2015-02-24 11:50...|150773|
|{2015-02-24 13:00...|133323|
|{2015-02-22 00:35...|    35|
|{2015-02-23 12:30...|100853|
+--------------------+------+

Handling Late Data with Watermarks

Concretely, a watermark is an amount of time following a given event or set of events after which we do not expect to see any more data from that time. We know this can happen due to delays on the network, devices that lose a connection, or any number of other issues.

 If we specify a watermark of 10 minutes. When doing this, we instruct Spark that any event that occurs more than 10 “event-time” minutes past a previous event should be ignored.

In [None]:
from pyspark.sql.functions import window, col
withEventTime\
    .withWatermark("event_time", "30 minutes")\
    .groupBy(window(col("event_time"), "10 minutes", "5 minutes"))\
    .count()\
    .writeStream\
    .queryName("pyevents_per_window4")\
    .format("memory")\
    .outputMode("complete")\
    .start()

In [None]:
spark.sql("SELECT * FROM pyevents_per_window4").printSchema()

In [None]:
spark.sql("SELECT * FROM pyevents_per_window4").show(5)

+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2015-02-23 14:15...|26936|
|{2015-02-24 11:50...|37714|
|{2015-02-24 13:00...|33324|
|{2015-02-22 00:35...|    6|
|{2015-02-23 12:30...|25218|
+--------------------+-----+
only showing top 5 rows

Dropping Duplicates in a Stream

One of the more difficult operations in record-at-a-time systems is removing duplicates from the stream.  ...  A perfect example of this are Internet of Things (IoT) applications that have upstream producers generating messages in nonstable network environments, and the same message might end up being sent multiple times. Your downstream applications and aggregations should be able to assume that there is only one of each message.


Essentially, Structured Streaming makes it easy to take message systems that provide at-least-once semantics, and convert them into exactly-once by dropping duplicate messages as they come in, based on arbitrary keys. To de-duplicate data, Spark will maintain a number of user specified keys and ensure that duplicates are ignored.

The core assumption is that duplicate events will have the same timestamp as well as identifier. In this model, rows with two different timestamps are two different records:

In [None]:
from pyspark.sql.functions import expr

withEventTime\
    .withWatermark("event_time", "5 seconds")\
    .dropDuplicates(["User", "event_time"])\
    .groupBy("User")\
    .count()\
    .writeStream\
    .queryName("pydeduplicated")\
    .format("memory")\
    .outputMode("complete")\
    .start()

In [None]:
spark.sql("select * from pydeduplicated").printSchema()

In [None]:
spark.sql("select * from pydeduplicated").show(10)

+----+-----+
|User|count|
+----+-----+
|   a|80850|
|   b|91230|
|   c|77150|
|   g|91679|
|   h|77330|
|   e|96897|
|   f|92060|
|   d|81240|
|   i|92550|
+----+-----+