## Structured Streaming

### Overview

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine

Streaming computation can be expressed as a batch computation on static data. 

The Spark SQL engine will run continuously and update the final result as streaming data continues to arrive. 

End-to-end exactly-once fault-tolerance is guaranteed through checkpointing and Write-Ahead Logs. 

In short, Structured Streaming provides
- fast
- scalable
- fault-tolerant
- end-to-end exactly-once 
stream processing 

## Quick Example

```bash
./sparkNC.sh 9999
```

```bash
./sparkTap.sh structuredstreamingwc.py
```

## Let's see the code

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

# Create a Spark Session 

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()


# Create DataFrame representing the stream of input lines from connection to tapnc:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "tapnc") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

 # Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()
```

## Programming Model

The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. 

This leads to a new stream processing model that is very similar to a batch processing model. 

You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. 

## Input Table

Data stream are "inserted" in the Input Table

![](https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png)

## Query 

Query the input will generate a "Result table".

Trigger interval "updates" the Result Table

Result rows can be exported to an external sink

![](https://spark.apache.org/docs/latest/img/structured-streaming-model.png)

## Output

Output is what will be sent to external storage



- Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

- Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

- Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

## Quick Example Model

![](https://spark.apache.org/docs/latest/img/structured-streaming-example-model.png)

## API

### Create a stream dataframe|dataset

```python
spark = SparkSession...

# Create DataFrame representing the stream of input lines 
lines = spark \
    .readStream \
```

## Input Sources

- File source - Reads files written in a directory as a stream of data. Files will be processed in the order of file modification time. If latestFirst is set, order will be reversed. Supported file formats are text, CSV, JSON, ORC, Parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.

- Kafka source - Reads data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher. See the Kafka Integration Guide for more details.

- Socket source (for testing) - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.

- Rate source (for testing) - Generates data at the specified number of rows per second, each output row contains a timestamp and value. Where timestamp is a Timestamp type containing the time of message dispatch, and value is of Long type containing the message count, starting from 0 as the first row. This source is intended for testing and benchmarking

## Another Example

```bash
./sparkTap.sh structuredstreamingreadfile.py
```

Inside Docker
```bash
cd /tapvolume
echo "nics,75" > salvo.csv
```

## Operations

You can apply all kinds of operations on streaming DataFrames/Datasets – ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the [SQL programming guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)


## Unsupported Operations

some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

- count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.

- foreach() - Instead use ds.writeStream.foreach(...) (see next section).

- show() - Instead use the console sink (see next section).

## Output Sinks

File sink - Stores the output to a directory.
```scala
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
```

Kafka sink - Stores the output to one or more topics in Kafka.
```scala
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
```

Foreach sink - Runs arbitrary computation on the records in the output. See later in the section for more details.
```
writeStream
    .foreach(...)
    .start()
```

Console sink (for debugging) - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory after every trigger.
```
writeStream
    .format("console")
    .start()
```

Memory sink (for debugging) - The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution
```scala
writeStream
    .format("memory")
    .queryName("tableName")
    .start()
```

### Start

Note that you have to call start() to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query,o 

## For each/for each batch

foreachBatch(...) allows you to specify a function that is executed on the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.

```python
def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    pass
  
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
```

If foreachBatch is not an option (for example, corresponding batch data writer does not exist, or continuous processing mode), then you can express your custom writer logic using foreach. Specifically, you can express the data writing logic by dividing it into three methods: open, process, and close. Since Spark 2.4, foreach is available in Scala, Java and Python.

In Python, you can invoke foreach in two ways: in a function or in an object. The function offers a simple way to express your processing logic but does not allow you to deduplicate generated data when failures cause reprocessing of some input data. For that situation you must specify the processing logic in an object.

```python
def process_row(row):
    # Write row to storage
    pass

query = streamingDF.writeStream.foreach(process_row).start()
```

```python
class ForeachWriter:
    def open(self, partition_id, epoch_id):
        # Open connection. This method is optional in Python.
        pass

    def process(self, row):
        # Write row to connection. This method is NOT optional in Python.
        pass

    def close(self, error):
        # Close the connection. This method in optional in Python.
        pass
      
query = streamingDF.writeStream.foreach(ForeachWriter()).start()
```

## Streaming Table APIs


Since Spark 3.1, you can also use DataStreamReader.table() to read tables as streaming DataFrames and use DataStreamWriter.toTable() to write streaming DataFrames as tables:



```python
spark = ...  # spark session

# Create a streaming DataFrame
df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# Write the streaming DataFrame to a table
df.writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .toTable("myTable")

# Check the table result
spark.read.table("myTable").show()

# Transform the source dataset and write to a new table
spark.readStream \
    .table("myTable") \
    .select("value") \
    .writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .format("parquet") \
    .toTable("newTable")

# Check the new table result
spark.read.table("newTable").show()
```

Not covered
- triggers
- managing streaming queries
- Monitoring Streaming Queries
- Reporting Metrics programmatically using Asynchronous APIs
- Recovering from Failures with Checkpointing
- Recovery Semantics after Changes in a Streaming Query
- Continuous Processing
