# 3. API using Datasets and DataFrames

Can use `SparkSession` as the entrypoint to create streaming DataFrames/Datasets.

## Creating streaming Datasets and DataFrames

* `DataStreamReader` -- creates a streaming DataFrame
* `SparkSession.readStream()` -- returns the stream DataFrame

### Input Sources

* __File Source__ - Reads files written in a directory as a stream of data. Files are processed in order of _modification time_. Files must be atomically placed in the given directory, which in most file systems is a file move operation
	* supports glob paths, but not multiple comma-separated paths/globs
	* archiving or deleting completed files will introduce overhead which will slow down the micro-batch processing speed. This option will also reduce the cost to list source files which can be expensive
	* source path should not be used from multiple soruces or queries. It shouldn't match any files in output directory or stream sink
	* delete and move actions are best effort. Failing to delete or move will not fail the streaming query 
* __Kafka Source__ - Reads data from a Kafka source
* __Socket Source (for testing)__ - Reads UTF8 text data from a socket connection. The listening socket is at the driver. Note that this should only be used for testing, exactly-once fault tolerance is not guaranteed.
	* host and port numbers must be provided
* __Rate Source (for testing)__ - Generates data a specified number of rows per second, each output row contains a timestamp (Timestamp data type) containing time of message dispatch and value (Long type) containing the message count. Source is intended for testing and benchmarking
	* source will try to reach `rowsPerSecond` parameter
	* `numPartitions` can be tweaked to help reach desired speed
* __Rate Per Micro-Batch source (for testing)__ - PRovides a specified number of rows per micro-batch. Difference between above is it provides a consistent set of inputs per row in each batch regardless of query execution (i.e batch 0 - 0-99, batch 1 - 100-199, ...). Both timestamp and value are also  provided. Source is intended for testing and benchmarking 
	* similar comments to above

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("API using Datasets and Datastructures").getOrCreate()

# Example 1 - socket stream
df_socket = spark \
	.readStream \
	.format('socket') \
	.option('host', 'localhost') \
	.option('port', 9999) \
	.load()

df_socket.isStreaming() # Returns True for DataFrames that have streaming sources

df_socket.printSchema()

# Example 2 - Read all the csv
userSchema = StructType().add('name', 'string').add('age', 'integer')
df_csv = spark \
	.readStream \
	.option('sep', ';') \
	.schema(userSchema) \
	.csv('path/to/directory')

Some operations (e.g `map`, `flatMap`) require type to be known at compile time. Above only checks the type at runtime. Untyped streaming DataFrames can be transformed to typed ones using the same method as static DataFrames.

_Refer to 2a. Getting Started - Programmatically specifying the schema to know how to do this_

## Schema inference and parition streaming DataFrames/Datasets

File based sources require schema specification for Structured Streaming. Can be enabled using `spark.sql.streaming.schemaInference` to true for other ad-hoc use cases.

Partition discovery does occur when subdirectories that are named `/key=value/` are present and listing will automatically recurse into these directories. The directories that make up the partitioning scheme must be present when the query starts and must remain static (means the partition columns cannot change in scope)

For example, it is okay to add `/data/year=2016/` when `/data/year=2015/` was present, but it is invalid to change the partitioning column (i.e. by creating the directory `/data/date=2016-04-17/`).

---

# Operations on streaming DataFrames/Datasets

Both untyped, SQL-like operations (SELECT, WHERE, GROUPBY) and RDD-like operations (map, filter, flatMap) are applicable

## Basic Operations - Selection, Projection, Aggregation

Most common operations on DataFrames/Datasets are supported for streaming versions

In [1]:
# streaming dataframe with IOT device data with schema
# {device: string, deviceType: string, signal: double, time: TimeStamp}
df = spark \
	.readStream \
	.format('socket') \
	.option('host', 'localhost') \
	.option('port', 9999) \
	.load()

# Select the devices which have signal more than 10
df.select('device').where("signal > 10")

# Running count of the number of updates for each device
df.groupBy("deviceType").count()

# Register streaming DataFrame as a temporary view and apply SQL commands
df.createOrReplaceTempView('updates')
spark.sql("SELECT count(*) FROM updates")

# Identify whether DataFrame has streaming data
df.isStreaming()

NameError: name 'spark' is not defined

## Window Operations on Event Time

Runs similar to grouped aggregations. Aggregate values are maintained for the rows which have Event-time that gall within the window.

Example: Want to count the words within a 10min window, which updates every 5mins (e.g 12:00-12:10, 12:05-12:15, 12:10-12:20, ...). If a word comes in at 12:07 it should update values in 2 windows. Counts will be indexed by both, the grouping key and the window. Illustrated below:

![streaming window example](https://spark.apache.org/docs/latest/img/structured-streaming-window.png)

* `window(<column>, <window_interval>, <sliding_interval>)` - create a window

In [None]:
from pyspark.sql.functions import window
# schema - {timestamp: Timestap, word: String}
words = spark \
	.readStream \
	.format('socket') \
	.option('host', 'localhost') \
	.option('port', 9999) \
	.load()

# Group the data by window and word and compute the count to each group
windowed_count = words.groupBy(
	window(words.timestamp, '10 minutes', '5 minutes'),
	words.word
).count()

## Handling Late Data and Watermarking

Structured Streaming can maintain the intermediate state for the partial aggregates for a long period of time such that late data can update aggregates of old windows correctly

However, system needs to bound the amount of intermediate in-memory state it accumulates i.e needs to know when an old aggregate can be dropped from in-memory state.

_Watermarking_ - specify the event time column and threshold on how late the data is expected to be in terms of event time. $(max\ event\ time\ seen\ by\ engine - late\ threshold > T)$ where $T$ is the specific window ending at time $T$. Data later than the threshold will be dropped (watermark is the latest timestamp - threshold)

* `.withWatermark(<column>, <threshold>)` - timeframe for late data

In [None]:
# schema - {timestamp: Timestap, word: String}
words = spark \
	.readStream \
	.format('socket') \
	.option('host', 'localhost') \
	.option('port', 9999) \
	.load()

windowed_counts = words \
	.withWatermark('timestamp', '10 minutes') \
	.groupBy(
		window(words.timestamp, '10 minutes', '5 minutes'),
		words.word
	) \
	.count()

Example of how watermarking will work

![watermark example](https://spark.apache.org/docs/latest/img/structured-streaming-watermark-update-mode.png)

## Types of time windows

1. __Tumbling Windows__ - series of fixed-sized non-overlapping and contiguous time intervals. An input can only be bound to a single window
2. __Sliding Windows__ - "fixed-sized", but windows can overlap if the duration of slide is smaller than duration of window. Inputs can be bound to multiple windows
3. __Sesion Windows__ - A session window starts when an input is received, and will extend if a new input is received within the specified gap duration. If no entry is received, then the window closes and awaits the next input to restart a new window. They are therefore dynamic in size of the window length
	* functions/expressions can be used to specify the gap duration dynamically based on the input row. With fynamic gap durations, the closing of a session window does not depend on the latest input anymore, it's range is the union of all events' range

1 and 2 use `window()`, 3 uses `session_window()`

In [None]:
from pyspark.sql.functions import session_window

events = spark \
	.readStream \
	.format('socket') \
	.option('host', 'localhost') \
	.option('port', 9999) \
	.load()

# Static session_window
static_sessionized_counts = events \
	.withWatermark('timestamp', '10 minutes') \
	.groupBy(
		session_window(events.timestamp, '5 minutes'),
		events.userId
	).count()

# Dynamic session_window
from pyspark.sql import functions as F

session_window = session_window(
	events.timestamp,
	F.when(events.userId == 'user1', '5 seconds').when(events.userId == 'user2', '20 seconds').otherwise('5 minutes')
)

dynamic_sessionzied_counts = events \
	.withWatermark('timestamp', '10 minutes') \
	.groupBy(
		session_window,
		events.userId
	).count()

## Conditions for watermarking to clean aggregation state

Following conditions must be satisfied for the watermarking to clean the stat in aggregation queries:

1. Output mode must be Append or Update
2. Aggreation must have either have the event-time column or a `window` on the event-time column
3. `withWatermark` must be called on the same column as the timestamp column used in the aggregation
4. `withWatermark` must be called before the aggregation for the watermark details to be used

## Semantic Guarantees of Aggregation with Watermarking

* A watermark delay of "2 hours" gurantees that any data that is received less than 2 hours after the latest data processed will always be aggregated
* Gurantee is strictly in one direction. Data delayed more than 2 hours is not guranteed to be dropped; may or may not be aggregated - more delayed the data is the less likely it will be aggregated

---

# Join Operations

Structured Streaming supports joins between streaming DataFrames, static DataFrames, and other streaming DataFrames. The result of the join is incremental (similar to aggregation queries). The result of the join with a streaming DataFrame is the same as if it were a static DataFrame

## Stream-static Joins

Supported joins (inner joins and some types of outer joins) between a streaming and static DataFrame. Stream-static joins are not stateful, so no state management is required

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("joins").getOrCreate()

df_static = spark.read.json('players.json') # file does not exist
df_stream = spark \
	.readStream \
	.format('socket') \
	.option('host', 'localhost') \
	.option('port', 9999) \
	.load()

df_stream.join(df_static, 'type') # inner equi-join with a static DF
df_stream.join(df_static, 'type', 'left_outer') # left outer join with a static DF

## Stream-stream Joins

Both dataFrames are still incomplete, meaning that current rows on the first DataFrame can join with rows that are yet to enter on the second DataFrame. 

Hence, for both streams, PySpark will buffer past input as streaming state, so that every future input can still be matched with a part input and results generated accordingly. Furthermore, it's still able to handle late and out-of-order data based on watermakrs.

Below are a few types of stream-stream joins

### Inner Joins with optional Watermarking

Inner joins on any kind of columns along with any kind of join conditions are supported. As stream runs, the streaming state will continue to increase, so to avoid unbounded state, have to define additional join conditions such that old inputs cannot match with new inputs indefinitely, essentially clearing them from the state. 

The follow steps are required:

1. Define watermark delays on both inputs such that the engine knows how delayed the inputs can get
2. Define a constraint on event-time across 2 inputs such that the engine will know when old rows of 1 input will no longer match with new inputes of the other input. Can defined in the following:
	* Time range join conditions (`... JOIN ON leftTime BETWEEN rightTime and rightTime + INTERVAL 1 HOUR`)
	* Join on event-time windows (`... JOIN ON leftTimeWindow = rightTimeWindow`)

In [None]:
# e.g matching advertisement air-times with customer clicks to correlate monetizable clicks
# ad impressions watermark = 2hrs | clicks watermark = 3hrs
# join time range = 1hr after impression is made

from pyspark.sql.functions import expr

impressions = spark.readStream.format('socket').option('host', 'localhost').option('port', 9999).load()
clicks = spark.readStream.format('socket').option('host', 'localhost').option('port', 9999).load()

# Apply watermarks on event-time columns
impressions_watermarked = impressions.withWatermark('impressionTime', '2 hours')
clicks_watermarked = impressions.withWatermark('clickTime', '3 hours')

# Join with event-time constraints
impressions_watermarked.join(
	clicks_watermarked,
	expr(
		'''
		clickAdId = impressionAdId AND
		clickTime >= impreesionTime AND
		clickTime <= impressionTime + interval 1 hour		
		'''
	)
)

## Outer Joins with Watermarking

The watermark + event-tme constraints must be specified for outer joins. Engine has to know when an input row is not going to match with anything else in the future.

Outer join queries will look like inner joins except it will specify an additional parameter to indicate it is an outer join

__Caveats__

* The outer NULL result will be generated with a _delay that matches the watermark delay and the time range condition_. This is to ensure that all delayed data is accounted for before returning the result
* If any of the 2 input streams being joined does not receive data for a while, the outer (both cases left and right) output may get delayed

In [None]:
impressions_watermarked.join(
	clicks_watermarked,
	expr(
		'''
		clickAdId = impressionAdId AND
		clickTime >= impreesionTime AND
		clickTime <= impressionTime + interval 1 hour		
		'''
	),
	'leftOuter' # can be 'inner', 'leftOuter', 'rightOuter', 'leftSemi'
)

## Semi joins with Watermarking

Watermark + event-time constraint must be specified for semi joins. They have the same guarantees as inner joins regarding watermark delays and whether data gets dropped or not.

## Additional Details on Joins

* Joins can be cascaded, `df1.join(df2, ...).join(df3, ...).`
* Only use joins when the query is in Append output mode
* Cannot use other non-map-like operations before joins (e.g use streaming aggregations before joins)

---

# Streaming Deduplication

Deplication is removing repeated instances of data to decrease storage capacity requirements.

Can deduplicate records in the data using a unique identifier in the events. Query will store the necessary data from the previous records to perform the duplication check. Can use deduplication with or without watermarking.

* _With watermark_: If there is an upper bound on how late a duplicate record may arrive, then you can define a watermark on an event time column and deduplicate using both the guid and the event time columns. The query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more. This bounds the amount of the state the query has to maintain.
* _Without watermark_: no bounds on when a duplicate record may arrive, the query stores data from all past records as state

In [None]:
df_stream = spark.readStream.format('socket').option('host', 'localhost').option('port', 9999).load()

# Without watermark using guid column
df_stream.dropDuplicates('guid')

# With watermark using guid and eventTime columns
df_stream \
	.withWatermark('eventTime', '10 seconds') \
	.dropDuplicates('guid', 'eventTime')

---

# Policy for handling multiple watermarks

A global watermark is created when multiple watermarked streams are used and joined together. By defaul the minimum watermark time is chosen to ensure that no data is accidentally dropped if 1 stream falls behind. Global watermark will move at the pace of the slowest stream and query output will be delayed accordingly 

Able to change this global watermark policy using `spark.sql.streaming.multipleWatermarkPolicy`. However, consider the side effects before changing this

---

# Arbitrary Stateful Operations

For more advanced stateful operations, use sessionization.

---

# Unsupported Operations

see documentation for list of unsupported operations

---

# Limitation of Global Watermark

In Append mode, if a stateful operation emits rows older than the current watermark + allowed late record delay, they are considered "late rows" in downstream stateful operation(s). These rows may be discarded and they can potentially cause a correctness issue.

Spark has 2 ways to check the number of late rows on stateful operators:

1. On Spark UI: check the metrics in stateful oeprator nodes in the query execution details page in SQL tab
2. On Streaming Query Listener: check "numRowsDroppedByWatermark" in "stateOperators" in QueryProcessEvent

---

# State Store

State store is a versioned key-value store that provides read and write operations. In Structured-Streaming, it is used to handle the stateful operations across batches. There are 2 built-in state store providers, but users can implement their own sate store provider.

1. HDFS state store provider
2. RocksDB state store implementation

---

# Starting Streaming Queries

Once the final result of the DataFrame has been defined, the streaming query needs to be started. Use `DataStreamWriter` returned through `Dataset.writeStream()` with the following parameters.

* `DataStreamWriter()` - start the stream query
	* _Details of output sink_ -- data format, location, etc.
	* _Output mode_ (default append) -- specify what gets written to the output sink
	* _Query name_ (optional) -- specify a unique name of the query for identification
	* _Trigger interval_ (optional) -- specify trigger interval. If not system will check for avilability of new data once previous proccessing has been completed
		* if trigger time is missed, it will start the next process immediately once done
	* _Checkpoint location_ -- specify the location where the system will write all checkpoint information

## Output Modes

Append (default), Complete, Update. Refer to documentation to see which modes are compatible with which query type

## Output Sinks

Types of built-in output sinks.

* File sink
	- `<path>` -- path to output directory
	- `<retention>` -- TTL for output files
* Kafka sink
* Foreach sink
* Console sink (for debugging)
	- `<numRows>` -- Number of rows to print every trigger (default: 20)
	- `<truncate>` -- Whether to truncate output if it's too long (default: true)
* Memory sink (for debugging)

Have to call `.start()` to start the execution of the query. It will return a `StreamingQuery` object which can be used to manage the query

In [None]:
## Output Sinks

# File sink - store output to a directory
df_stream.writeStream \
	.format('parquet') \
	.option('path', 'path/to/destination') \
	.start()
	
# Kafka sink - stores output to one of more Kafka topics
df_stream.writeStream \
	.format('kafka') \
	.option('kafka.bootstrap.servers', 'host1:port1,host2:port2') \
	.option('topic', 'updates') \
	.start()

# Foreach sink - Runs arbitrary computation on the records in the output
df_stream.writeStream \
	.foreach() \
	.start()

# Console sink - prints the output to the console/stdout everytime there is a trigger. Only for debugging
df_stream.writeStream \
	.format('console') \
	.start()

# Memory sink - stored in memory as an in-memory table. Only for debugging
df_stream.writeStream \
	.format('memory') \
	.queryName('tableName') \
	.start()

## Triggers