## Structured Streaming
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

### Overview

Structured Streaming is 

a scalable and fault-tolerant 

**stream processing engine** 

built on the **Spark SQL** engine. 

![](https://dbconvert.com/blog/content/images/size/w2000/2021/11/Data-stream-processing.png)

https://dbconvert.com/blog/data-stream-processing/

You can express your streaming computation the same way you would express a batch computation on static data. 

The **Spark SQL engine** will take care of running it incrementally and continuously and updating the final result as **streaming data** continues to arrive. 

![](https://images.ctfassets.net/8vofjvai1hpv/36gl5VFGguEw5PxZizOJVu/b50f5e031efda3635a0d1b912e106243/Database_Streaming.png)
https://www.confluent.io/learn/data-streaming/

You can use the Dataset/DataFrame API in Scala, Java, Python or R to express
- streaming aggregations
- event-time windows 
- stream-to-batch joins

The computation is executed on the same optimized Spark SQL engine. 

![](https://miro.medium.com/v2/resize:fit:1400/format:webp/1*DsRyFzVjioGKmpoRZW0VTg.png)

https://mohdizzy.medium.com/leverage-flink-windowing-to-process-streams-based-on-event-time-cdb87e9a1e21

Finally, the system ensures **end-to-end exactly-once fault-tolerance** guarantees through checkpointing and Write-Ahead Logs. 

![](https://dimosr.github.io/assets/img/posts/exactly_once.jpg)

https://dimosr.github.io/the-tale-of-exactly-once-semantics/

In short, Structured Streaming provides
- fast
- scalable
- fault-tolerant
- end-to-end exactly-once 
stream processing without the user having to reason about streaming.

![](https://i.imgflip.com/7kv31f.jpg)
[NicsMeme](https://imgflip.com/i/7kv31f)

A fake meme 
https://screenrant.com/matrix-meme-what-told-you-not-in-movie/

## Quick Example

```bash
./sparkNC.sh 9999
```

```bash
./sparkTap.sh structuredstreamingwc.py
```

## Programming Model

The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. 

This leads to a new stream processing model that is very similar to a batch processing model. 

You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. 

## Input Table

Data stream are "inserted" in the Input Table

![](https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png)

## Query 

Query the input will generate a "Result table".

Trigger interval "updates" the Result Table

Result rows can be exported to an external sink

![](https://spark.apache.org/docs/latest/img/structured-streaming-model.png)

## Output

Output is what will be sent to external storage



- Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

- Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

- Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

## Quick Example Model

![](https://spark.apache.org/docs/latest/img/structured-streaming-example-model.png)

## Notes

Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. 

It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).

This model is significantly different from many other stream processing engines. Many streaming systems require the user to maintain running aggregations themselves, thus having to reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-once). 

In this model, Spark is responsible for updating the Result Table when there is new data, thus relieving the users from reasoning about it. 

## API
Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data.

Similar to static Datasets/DataFrames, you can use the common entry point SparkSession (Scala/Java/Python/R docs) to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets. 

If you are not familiar with Datasets/DataFrames, you are strongly advised to familiarize yourself with them using the [DataFrame/Dataset Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html).

### Create a stream dataframe|dataset

```python
spark = SparkSession...

# Create DataFrame representing the stream of input lines 
lines = spark.readStream(INPUT)
```

## Input Sources

### File source 

Reads files written in a directory as a stream of data. Files will be processed in the order of file modification time. If latestFirst is set, order will be reversed. 

Supported file formats are text, CSV, JSON, ORC, Parquet. 

See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.

### Kafka source 

Reads data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher. 

See the [Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) for more details.

### Socket source (for testing) 

Reads UTF8 text data from a socket connection. 

The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.

### Rate source (for testing) 

Generates data at the specified number of rows per second, each output row contains a timestamp and value. Where timestamp is a Timestamp type containing the time of message dispatch, and value is of Long type containing the message count, starting from 0 as the first row. This source is intended for testing and benchmarking

### Rate Per Micro-Batch source (for testing) 

Generates data at the specified number of rows per micro-batch, each output row contains a timestamp and value. 

Where timestamp is a Timestamp type containing the time of message dispatch, and value is of Long type containing the message count, starting from 0 as the first row. 

Unlike rate data source, this data source provides a consistent set of input rows per micro-batch regardless of query execution (configuration of trigger, query being lagging, etc.), say, batch 0 will produce 0-999 and batch 1 will produce 1000-1999, and so on. Same applies to the generated time. 

This source is intended for testing and benchmarking.

## Another Example

```bash
./sparkTap.sh structuredstreamingreadfile.py
```

Inside Docker
```bash
docker exec -it pytap /bin/bash

cd /tapvolume
echo "nics,75" > salvo.csv
```

## Operations

You can apply all kinds of operations on streaming DataFrames/Datasets – ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the [SQL programming guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)


### Basic Operations - Selection, Projection, Aggregation


#### Filters
```bash
./sparkTap.sh structuredstreaming-prime-filter.py
```

#### Add new column
```bash
./sparkTap.sh structuredstreaming-prime-newcol.py
```

#### GroupBy 
```bash
./sparkTap.sh structuredstreaming-prime-groupBy.py
```

#### SQL 
```bash
./sparkTap.sh structuredstreaming-prime-groupByAndSQL.py
```

### Window Operations on Event Time


Aggregations over a sliding event-time window are straightforward with Structured Streaming and are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let’s understand this with an illustration.

Imagine our quick example is modified and the stream now contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time).



![](https://spark.apache.org/docs/latest/img/structured-streaming-window.png)

#### Window

```bash
./sparkTap.sh structured_network_wordcount_windowed.py localhost 9999 10 5
```

```bash
./sparkNC.sh 9999
```

### Join Operations
Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame as well as another streaming Dataset/DataFrame. The result of the streaming join is generated incrementally, similar to the results of streaming aggregations in the previous section. In this section we will explore what type of joins (i.e. inner, outer, semi, etc.) are supported in the above cases. Note that in all the supported join types, the result of the join with a streaming Dataset/DataFrame will be the exactly the same as if it was with a static Dataset/DataFrame containing the same data in the stream.

#### Stream-static Joins
Since the introduction in Spark 2.0, Structured Streaming has supported joins (inner join and some type of outer joins) between a streaming and a static DataFrame/Dataset. Here is a simple example.

```bash
./sparkTap.sh structuredstreaming-prime-join-static.py
```

#### Stream-stream Joins
In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming Datasets/DataFrames. The challenge of generating join results between two data streams is that, at any point of time, the view of the dataset is incomplete for both sides of the join making it much harder to find matches between inputs. Any row received from one input stream can match with any future, yet-to-be-received row from the other input stream. Hence, for both the input streams, we buffer past input as streaming state, so that we can match every future input with past input and accordingly generate joined results. Furthermore, similar to streaming aggregations, we automatically handle late, out-of-order data and can limit the state using watermarks. Let’s discuss the different types of supported stream-stream joins and how to use them.

#### Inner Joins with optional Watermarking
Inner joins on any kind of columns along with any kind of join conditions are supported. However, as the stream runs, the size of streaming state will keep growing indefinitely as all past input must be saved as any new input can match with any input from the past. To avoid unbounded state, you have to define additional join conditions such that indefinitely old inputs cannot match with future inputs and therefore can be cleared from the state. In other words, you will have to do the following additional steps in the join.

Define watermark delays on both inputs such that the engine knows how delayed the input can be (similar to streaming aggregations)

Define a constraint on event-time across the two inputs such that the engine can figure out when old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for matches with the other input. This constraint can be defined in one of the two ways.

1. Time range join conditions (e.g. ...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),

2. Join on event-time windows (e.g. ...JOIN ON leftTimeWindow = rightTimeWindow).



## Unsupported Operations

some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

- count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.

- foreach() - Instead use ds.writeStream.foreach(...) (see next section).

- show() - Instead use the console sink (see next section).

## Starting Streaming Queries
Once you have defined the final result DataFrame/Dataset, all that is left is for you to start the streaming computation. To do that, you have to use the DataStreamWriter (Scala/Java/Python docs) returned through Dataset.writeStream(). You will have to specify one or more of the following in this interface.

- Details of the output sink: Data format, location, etc.

- Output mode: Specify what gets written to the output sink.

- Query name: Optionally, specify a unique name of the query for identification.

- Trigger interval: Optionally, specify the trigger interval. If it is not specified, the system will check for availability of new data as soon as the previous processing has been completed. If a trigger time is missed because the previous processing has not been completed, then the system will trigger processing immediately.

- Checkpoint location: For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section.

### Output Modes

There are a few types of output modes.

- Append mode (default) - This is the default mode, where only the new rows added to the Result Table since the last trigger will be outputted to the sink. This is supported for only those queries where rows added to the Result Table is never going to change. Hence, this mode guarantees that each row will be output only once (assuming fault-tolerant sink). For example, queries with only select, where, map, flatMap, filter, join, etc. will support Append mode.

- Complete mode - The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.

- Update mode - (Available since Spark 2.1.1) Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. More information to be added in future releases.



## Output Sinks

File sink - Stores the output to a directory.
```scala
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
```

Kafka sink - Stores the output to one or more topics in Kafka.
```scala
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
```

Foreach sink - Runs arbitrary computation on the records in the output. See later in the section for more details.
```
writeStream
    .foreach(...)
    .start()
```

Console sink (for debugging) - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory after every trigger.
```
writeStream
    .format("console")
    .start()
```

Memory sink (for debugging) - The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution
```scala
writeStream
    .format("memory")
    .queryName("tableName")
    .start()
```

### Start

Note that you have to call start() to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query,o 

## For each/for each batch

foreachBatch(...) allows you to specify a function that is executed on the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.

```python
def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    pass
  
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
```

If foreachBatch is not an option (for example, corresponding batch data writer does not exist, or continuous processing mode), then you can express your custom writer logic using foreach. Specifically, you can express the data writing logic by dividing it into three methods: open, process, and close. Since Spark 2.4, foreach is available in Scala, Java and Python.

In Python, you can invoke foreach in two ways: in a function or in an object. The function offers a simple way to express your processing logic but does not allow you to deduplicate generated data when failures cause reprocessing of some input data. For that situation you must specify the processing logic in an object.

```python
def process_row(row):
    # Write row to storage
    pass

query = streamingDF.writeStream.foreach(process_row).start()
```

```python
class ForeachWriter:
    def open(self, partition_id, epoch_id):
        # Open connection. This method is optional in Python.
        pass

    def process(self, row):
        # Write row to connection. This method is NOT optional in Python.
        pass

    def close(self, error):
        # Close the connection. This method in optional in Python.
        pass
      
query = streamingDF.writeStream.foreach(ForeachWriter()).start()
```

## Streaming Table APIs


Since Spark 3.1, you can also use DataStreamReader.table() to read tables as streaming DataFrames and use DataStreamWriter.toTable() to write streaming DataFrames as tables:



```python
spark = ...  # spark session

# Create a streaming DataFrame
df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# Write the streaming DataFrame to a table
df.writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .toTable("myTable")

# Check the table result
spark.read.table("myTable").show()

# Transform the source dataset and write to a new table
spark.readStream \
    .table("myTable") \
    .select("value") \
    .writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .format("parquet") \
    .toTable("newTable")

# Check the new table result
spark.read.table("newTable").show()
```

Not covered
- triggers
- managing streaming queries
- Monitoring Streaming Queries
- Reporting Metrics programmatically using Asynchronous APIs
- Recovering from Failures with Checkpointing
- Recovery Semantics after Changes in a Streaming Query
- Continuous Processing
