# Transforming Streaming Data

Most of the common operations on DataFrame/Dataset are supported for streaming.

We already seen that operations such as: `filter`, `groupBy`, `select` can be applied on streaming Dataframes

### Transformations Caveats

Some operations that are offered by the standard DataFrame API do not make sense on a streaming context or would be very diffucult to implement on streaming data.

Operations **not supported** on streaming datasets are:

* limit / take(n)
* distinct - would require infinite memory and computational complexity of $O(n^2)$ 
* some outer joins 
* count - cannot return a single count from a streaming dataset. However, we can use count on aggregated data
* show - requires an immediate materialization of the query. Instead we can use `console` sink
* foreach - cannot be used directly on a stream, but there is a `foreach` sink that provides the same functionality


## Event time processing

*Event time* relates to the timeline at which events were produced.
In contrast, *processing time* is the timeline when events are ingested by the engine, and it is based on the clock of the computers processing the event stream. It’s is the “now” when the events enter the processing engine.

Structured Streaming processes data based on **event time**, inferring their timeline by tracking a continuously increasing upper bound of the timestamp field. This timeline serves as the primary clock for time-based processing. Structured Streaming's understanding of event time allows it to separate event generation from processing time. 
In particular, we can replay a sequence of past events and produced the correct results for all event-time aggregations. We could, for example, replay a week’s worth of events in a few minutes and have our system produce results consistent with a week period.



### Time-Based Window aggregations

Time-based window aggregation in Spark Structured Streaming involves grouping events into time intervals, such as the last 15 minutes or the last hour, and then computing aggregate functions over these intervals. This allows us to analyze the properties of events over specific time periods.
Moreover, the very idea of stream processing is that the system is supposed to be long-running, dealing with a continuous stream of data. As these events keep coming in, the older ones usually become less and less relevant.


Window aggregations are declared using a `window` function as grouping criteria. The window function **must** be applied to the field that we want to use as event time.

##### Example

Let's us reuse *word_count* introduction example to see better window aggregation

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, window

spark = SparkSession.builder.appName("TimeAggregations").getOrCreate()


# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .option('includeTimestamp', 'true') \
    .load()

# Split the lines into words
words = lines.select(
        explode(split(lines.value, ' ')).alias('word'),
        lines.timestamp
    )

your 131072x1 screen size is bogus. expect trouble
24/04/25 22:08:41 WARN Utils: Your hostname, DELEQ0283302041 resolves to a loopback address: 127.0.1.1; using 172.31.227.62 instead (on interface eth0)
24/04/25 22:08:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/25 22:08:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/25 22:08:44 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


In [2]:
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count().orderBy("window")


    # Start running the query that prints the windowed word counts to the console
query = windowedCounts\
        .writeStream\
        .outputMode('complete')\
        .format('console')\
        .option('truncate', 'false')\
        .start()

24/04/25 22:08:50 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-97701a28-2275-40aa-a594-88797f533890. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/25 22:08:50 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+-----+
|window                                    |word |count|
+------------------------------------------+-----+-----+
|{2024-04-25 22:00:00, 2024-04-25 22:10:00}|hello|4    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hello|4    |
+------------------------------------------+-----+-----+



Parameters for `window` functions:
* `timeColumn`: ColumnorName, -> requires TimeStamp | TimeStampNTZType
* `windowDuration`: string, -> width of the window provided in str. 
    Valid interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.
* `slideDuration`: string, ->  reporting frequency. Must be less or equal `windowDuration`. If not provided, provided window column will be a tumbling one
* `startTime`: string -> offset window allignment. Foe example, offsetting a 10-minute window with a slide duration of 5 minutes by 2 minutes, resulting in time intervals like *00:02-00:12, 00:07-00:17, 00:12-00:22, ...*

### Watermarks

A watermark is a time threshold that dictates how long we wait for events before declaring that they are too late. Events that are considered late beyond the watermark are discarded.

In [3]:
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count().orderBy("window")

In [4]:
query_with_watermark = windowedCounts.writeStream.outputMode("complete").format("console") \
    .option('truncate', 'false') \
    .queryName("window_counts_watermarked").start()

24/04/25 22:10:53 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-060db00b-a626-4ed3-8a52-26f3b6561073. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/25 22:10:53 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+-----+
|window                                    |word |count|
+------------------------------------------+-----+-----+
|{2024-04-25 22:00:00, 2024-04-25 22:10:00}|hello|4    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hello|5    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|hello|1    |
+------------------------------------------+-----+-----+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |word  |count|
+------------------------------------------+------+-----+
|{2024-04-25 22:00:00, 2024-04-25 22:10:00}|hello |4    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hello |5    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|apache|1    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|spark |1    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hadoop|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|hello |1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|apache|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|hadoop|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|spark |1    |
+------------------------------------------+------+-----+



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |word  |count|
+------------------------------------------+------+-----+
|{2024-04-25 22:00:00, 2024-04-25 22:10:00}|hello |4    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hello |6    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|apache|1    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|spark |1    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hadoop|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|hello |2    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|apache|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|hadoop|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|spark |1    |
+------------------------------------------+------+-----+



                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |word  |count|
+------------------------------------------+------+-----+
|{2024-04-25 22:00:00, 2024-04-25 22:10:00}|hello |4    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hello |7    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|apache|1    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|spark |1    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hadoop|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|hello |3    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|apache|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|hadoop|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|spark |1    |
+------------------------------------------+------+-----+



### Deduplication

Structured Streaming offers a built-in function that removes duplicate records in the stream. It is possible to specify a watermark that determines when it is safe to discard previously seen key.

##### Record deduplication without watermark

Requires you to store all received values for the set of fields defining a unique record, which can potentially be unbounded. (Discouraged method)

**Example**

In [None]:
words.dropDuplicates("words")

##### Record deduplication with watermark



In [None]:
words.withWatermark("timestamp", "10 minutes").dropDuplicates("words")

With watermatk, keys older than the watermark become eligible for deletion, allowing the state store to keep its storage needs bounded.

In [9]:
uniqueCounts = words.withWatermark("timestamp", "10 minutes").dropDuplicates(["word", "timestamp"])

query_deduplication = uniqueCounts.writeStream.outputMode("append").format("console") \
    .option('truncate', 'false') \
    .queryName("window_deduplication").start()

24/04/25 22:27:51 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a5c4995b-a91b-4942-944d-0bb094eb9ea5. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/25 22:27:51 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----+---------+
|word|timestamp|
+----+---------+
+----+---------+



                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |word  |count|
+------------------------------------------+------+-----+
|{2024-04-25 22:00:00, 2024-04-25 22:10:00}|hello |4    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hello |7    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|apache|1    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|spark |1    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hadoop|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|hello |3    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|apache|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|hadoop|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|spark |1    |
|{2024-04-25 22:20:00, 2024-04-25 22:30:00}|hello |1    |
|{2024-04-25 22:25:00, 2024-04-25 22:35:00}|hello |1    |
+------------------------------------------+------+-----+



                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |word  |count|
+------------------------------------------+------+-----+
|{2024-04-25 22:00:00, 2024-04-25 22:10:00}|hello |4    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hello |7    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|apache|1    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|spark |1    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hadoop|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|hello |3    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|apache|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|hadoop|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|spark |1    |
|{2024-04-25 22:20:00, 2024-04-25 22:30:00}|hello |2    |
|{2024-04-25 22:25:00, 2024-04-25 22:35:00}|hello |2    |
+------------------------------------------+------+-----+



                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |word  |count|
+------------------------------------------+------+-----+
|{2024-04-25 22:00:00, 2024-04-25 22:10:00}|hello |4    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hello |7    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|apache|1    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|spark |1    |
|{2024-04-25 22:05:00, 2024-04-25 22:15:00}|hadoop|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|hello |3    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|apache|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|hadoop|1    |
|{2024-04-25 22:10:00, 2024-04-25 22:20:00}|spark |1    |
|{2024-04-25 22:20:00, 2024-04-25 22:30:00}|helo  |1    |
|{2024-04-25 22:20:00, 2024-04-25 22:30:00}|hello |2    |
|{2024-04-25 22:25:00, 2024-04-25 22:35:00}|hello |2    |
|{2024-04-25 22:25:00, 2024-04-25