## Structured Streaming
<https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html> 

:::: {.columns}

::: {.fragment .column width="50%"}
Structured Streaming is 

- a scalable
- fault-tolerant 
- **stream processing engine** 
- built on the **Spark SQL** engine. 
::: 

::: {.fragment .column width="50%"}
![](https://dbconvert.com/blog/content/images/size/w2000/2021/11/Data-stream-processing.png)

<https://dbconvert.com/blog/data-stream-processing/>
:::
::::

### Same as batch

:::: {.columns}

::: {.fragment .column width="50%"}
- Streaming computation can be expressed in the the same way as a batch computation on static data. 
- The **Spark SQL engine** will run it **incrementally** and **continuously** and
- update the final result as **streaming data** continues to arrive. 
::: 

::: {.fragment .column width="50%"}
![](https://images.ctfassets.net/8vofjvai1hpv/36gl5VFGguEw5PxZizOJVu/b50f5e031efda3635a0d1b912e106243/Database_Streaming.png)
<https://www.confluent.io/learn/data-streaming/>
:::
::::


### with same programming Languages

:::: {.columns}

::: {.fragment .column width="50%"}
Dataset/DataFrame API are avaiable in Scala, Java, Python or R, in addition of what we have seen already allow to manage:

- streaming aggregations
- event-time windows 
- stream-to-batch joins

::: {.callout-tip .fragment}
The computation is executed on the same optimized Spark SQL engine. 
This is an example of a callout with a title.
:::

::: 

::: {.fragment .column width="50%"}
![](https://miro.medium.com/v2/resize:fit:1400/format:webp/1*DsRyFzVjioGKmpoRZW0VTg.png)

<https://mohdizzy.medium.com/leverage-flink-windowing-to-process-streams-based-on-event-time-cdb87e9a1e21> 
:::
::::





### and some guarantees 
:::: {.columns}

::: {.fragment .column width="50%"}
Finally, the system ensures: 

- **end-to-end**
- **exactly-once**
- **fault-tolerance**
- guarantees through **checkpointing** and **Write-Ahead Logs**

::: 

::: {.fragment .column width="50%"}
![](https://dimosr.github.io/assets/img/posts/exactly_once.jpg)

<https://dimosr.github.io/the-tale-of-exactly-once-semantics/>
:::
::::



### In short

:::: {.columns}

::: {.fragment .column width="50%"}
Structured Streaming provides: 

- fast
- scalable
- fault-tolerant
- end-to-end exactly-once 

stream processing without the user having to reason about streaming.
::: 

::: {.fragment .column width="50%"}
![](https://i.imgflip.com/7kv31f.jpg)
[NicsMeme](https://imgflip.com/i/7kv31f)

A fake meme 
<https://screenrant.com/matrix-meme-what-told-you-not-in-movie/>
:::
::::



### It's micro batch 

:::: {.columns}
::: {.fragment .column width="50%"}
Queries are processed:

- as small batch jobs, achieving 100 ms latency (and all gurantees)
- with **Continuous Processing** (since Spark 2.3), lower to 1 ms (with at-least-once)

::: 

::: {.fragment .column width="50%"}
![](https://i.imgflip.com/9d1w9p.jpg)

Inspired by [All that jazz](https://www.youtube.com/watch?v=xFK48dJXN_k)
:::
::::


## Quick Example

::: {.fragment}

Let's run a Netcat server on port 9999

```bash
docker run -it --rm --network=tap --name tapnc  --hostname netcat  splooge/dnsutils nc -vl 9999
```
::: 


::: {.fragment}

And a spark application 

```bash
docker run --hostname spark --name sparkwc -p 4040:4040 --network tap -it --rm \
-v ./spark/code/:/opt/tap/ \
apache/spark \
/opt/spark/bin/spark-submit /opt/tap/structuredstreamingwc.py
```

::: 

## Programming Model

- **Key idea**: treat a live data stream as a table that is being continuously appended
- to create a new stream processing model that is very similar to a batch processing model.
- Streaming computation as standard batch-like query as on a static table
- Spark runs it as an incremental query on the unbounded input table.

::: {.fragment}
```python
# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
```
::: 


### Input Table

Data streams are "inserted" in the Input Table

![](https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png){.fragment .r-scretch} 

### Query 

:::: {.columns}

::: {.fragment .column width="50%"}
- Query the input will generate a "Result table".
- Trigger interval "updates" the Result Table
- Result rows can be exported to an external sink
::: 

::: {.fragment .column width="50%"}
![](https://spark.apache.org/docs/latest/img/structured-streaming-model.png) 
:::
::::




### Output

Output is what will be sent to external storage

- Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

- Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

- Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

::: {.callout-note .fragment}
Each mode can be applicable on specific types of queries
:::



### Quick Example Model

![](https://spark.apache.org/docs/latest/img/structured-streaming-example-model.png)

## Notes 

- Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. 

- It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).

- This model is significantly different from many other stream processing engines. Many streaming systems require the user to maintain running aggregations themselves, thus having to reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-once). 

- In this model, Spark is responsible for updating the Result Table when there is new data, thus relieving the users from reasoning about it. 

## API

A spark streaming Dataframe can be created in the Spark session using readStream

```python
spark = SparkSession...

# Create DataFrame representing the stream of input lines 
lines = spark.readStream(INPUT)
```

### Input Sources

- **File** : Reads file (CSV, JSON, ORC, Parquet) in a directory (ordered by modification time) as stream of data
- **Kafka**: Reads data from Kafka. [See Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)
- **Socket**: (for testing purposes) reads UTF8 from a socket
- **Rate**: (for testing purposes) generates data at the specified number of rows per seconds
- **Rate Micro-Batch**: (for testing purposes) Generates data at the specified number of rows per micro-batch, each output row contains a timestamp and value. Unlike rate data source, this data source provides a consistent set of input rows per micro-batch regardless of query execution (configuration of trigger, query being lagging, etc.), say, batch 0 will produce 0-999 and batch 1 will produce 1000-1999, and so on.

## Example: Streaming from file

```bash
docker run --hostname spark --name sparkwcr -p 4040:4040 --network tap -it --rm \
-v ./spark/code/:/opt/tap/ apache/spark \
/opt/spark/bin/spark-submit /opt/tap/structuredstreamingreadfile.py
```

Inside Docker
```bash
docker exec -it sparkwcr /bin/bash

cd /opt/spark/work-dir
echo "nics,75" > salvo.csv
```

## Operations

Streaming supports most of the operations on DataFrames/Datasets 

-  untyped, SQL-like operations (e.g. select, where, groupBy)
-  typed RDD-like operations (e.g. map, filter, flatMap)

[SQL programming guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)

### Basic Operations - Selection, Projection, Aggregation


#### Filters
This example uses Rate Source as source, an UDF function to filter only prime numbers, prints in standard output 

:::: {.columns}

::: {.fragment .column width="50%"}
```python
# Create DataFrame reading from Rate Source
df = spark \
    .readStream \
    .format("rate") \
    .option("rowsPerSecond", 100000) \
    .load()

primes = df.filter(isprime('value'))

# Start running the query 
# Prints the running counts to the console
query = primes \
    .writeStream \
    .format("console") \
    .start()
```
::: 

::: {.fragment .column width="50%"}
```bash
docker run --hostname spark --name spark \ 
-p 4040:4040 --network tap -it --rm \
-v ./spark/code/:/opt/tap/ apache/spark \
/opt/spark/bin/spark-submit \
/opt/tap/structuredstreaming-prime-filters.py
```
:::
:::: 




#### Add new column
Extends the previous example adding a new column with the last digit of the filtered signal

:::: {.columns}

::: {.fragment .column width="50%"}
```python
# Create DataFrame reading from Rate Source
df = spark \
    .readStream \
    .format("rate") \
    .option("rowsPerSecond", 100) \
    .load()

#  Let's take the "primes" signals 
primes = df.filter(isprime('value'))

# Adding a new column with tha last digit 
primes = primes.withColumn("last",substring("value",-1,1))

# Start running the query 
# prints the running counts to the console
query = primes \
    .writeStream \
    .format("console") \
    .start()
```
::: 

::: {.fragment .column width="50%"}
```bash
docker run --hostname spark --name spark \ 
-p 4040:4040 --network tap -it --rm \
-v ./spark/code/:/opt/tap/ apache/spark \
/opt/spark/bin/spark-submit \
/opt/tap/structuredstreaming-prime-newcol.py
```
:::
::::


#### GroupBy 

Extends the example grouping by last digit and counting

:::: {.columns}

::: {.fragment .column width="50%"}
```python
# Create DataFrame reading from Rate Source
df = spark \
    .readStream \
    .format("rate") \
    .option("rowsPerSecond", 100) \
    .load()

#  Let's take the "primes" signals 
primes = df.filter(isprime('value'))

# Adding a new column with tha last digit 
primes = primes.withColumn("last",substring("value",-1,1))

# Group By last 
primes = primes.groupBy("last").count()

# Start running the query 
# prints the running counts to the console
query = primes \
    .writeStream \
    .format("console") \
    .start()
```
::: 

::: {.fragment .column width="50%"}
```bash
docker run --hostname spark --name spark \ 
-p 4040:4040 --network tap -it --rm \
-v ./spark/code/:/opt/tap/ apache/spark \
/opt/spark/bin/spark-submit \
/opt/tap/structuredstreaming-prime-groupBy.py
```
:::
::::


#### SQL 

Add another query to count the number of incoming signals 

:::: {.columns}

::: {.fragment .column width="50%"}
```python
# Create DataFrame reading from Rate Source
df = spark \
    .readStream \
    .format("rate") \
    .option("rowsPerSecond", 100) \
    .load()

df.createOrReplaceTempView("signals")
signalsQuery=spark.sql("select count(*) from signals")  
# returns another streaming DF

query2 = signalsQuery \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()
```
::: 

::: {.fragment .column width="50%"}
```bash
docker run --hostname spark --name spark \ 
-p 4040:4040 --network tap -it --rm \
-v ./spark/code/:/opt/tap/ apache/spark \
/opt/spark/bin/spark-submit \
structuredstreaming-prime-groupByAndSQL.py
```
:::
::::



### Window Operations on Event Time

:::: {.columns}

::: {.fragment .column width="50%"}
Aggregations over a sliding event-time window are very similar to grouped aggregations:

- grouped aggregation: aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column.
- window-based aggregations: aggregate values are maintained for each window the event-time of a row falls into. 

::: 

::: {.fragment .column width="50%"}
![](https://spark.apache.org/docs/latest/img/structured-streaming-window.png){.lightbox}
:::
::::




#### Window Example

Start Spark Streaming 

```bash
docker run --hostname spark --name spark -p 4040:4040 --network tap -it --rm \
-v ./spark/code/:/opt/tap/ apache/spark \
/opt/spark/bin/spark-submit \
/opt/tap/structured_network_wordcount_windowed.py localhost 9999 10 5
```



### Join Operations

Structured Streaming supports incremental join of a streaming Dataset/DataFrame with:

- static Dataset/DataFrame
- streaming Dataset/DataFrame. 

::: {.callout-note}
In all the supported join types, the result of the join with a streaming Dataset/DataFrame will be the exactly the same as if it was with a static Dataset/DataFrame containing the same data in the stream.
:::



#### Stream-static Joins

Since Spark 2.0, Structured Streaming has supported joins (inner join and some type of outer joins) between a streaming and a static DataFrame/Dataset.

Example: Join a rate flow with a dataframe of numbers

```bash
docker run --hostname spark --name spark -p 4040:4040 --network tap -it --rm \
-v ./spark/code/:/opt/tap/ \
-v ./spark/dataset:/tmp/dataset \
apache/spark \
/opt/spark/bin/spark-submit /opt/tap/structuredstreaming-prime-join-static.py
```



##### Stream-stream Joins

Since  Spark 2.3. Structured Streaming has supported  some stream-stream joins. 

::: {.callout-warning}
The challenge of generating join results between two data streams is that, at any point of time, the view of the dataset is incomplete for both sides of the join making it much harder to find matches between inputs. 

:::


#### Inner Joins with optional Watermarking

- Inner joins on any kind of columns along with any kind of join conditions are supported.
- To avoid unbounded state, since any new input can match with old input, additional join conditions need to be added

::: {.fragment}
**watermark**: Define watermark delays on both inputs such that the engine knows how delayed the input can be.

::: 

::: {.fragment}
**constraints**: Define a constraint on event-time across the two inputs such that the engine can figure out when old rows of one input is not going to be required for matches with the other input. 

This constraint can be defined in one of the two ways: 

1. Time range join conditions (```JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR```)
2. Join on event-time windows (```JOIN ON leftTimeWindow = rightTimeWindow```)
:::






##### Example of Stream-Stream join

Example of a ad stream using watermarking

```bash
docker run --hostname spark --name spark -p 4040:4040 --network tap -it --rm \
-v ./spark/code/:/opt/tap/ \
apache/spark \
/opt/spark/bin/spark-submit \
/opt/tap/structuredstreaming-join-streams-ad.py
```

## Starting Streaming Queries

To start the streaming computation, a new DataStreamWriter ([Python docs](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.html#pyspark.sql.streaming.DataStreamWriter)) , returned through Dataset.writeStream(), needs to be created

Parameters:

- Details of the output sink: Data format, location, etc.
- Output mode: Specify what gets written to the output sink.
- Query name: Optionally, specify a unique name of the query for identification.
- Trigger interval: Optionally, specify the trigger interval. 
- Checkpoint location: For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. 

### Output Modes

There are a few types of output modes.

- Append mode (default) - This is the default mode, where only the new rows added to the Result Table since the last trigger will be outputted to the sink. This is supported for only those queries where rows added to the Result Table is never going to change. Hence, this mode guarantees that each row will be output only once (assuming fault-tolerant sink). For example, queries with only select, where, map, flatMap, filter, join, etc. will support Append mode.

- Complete mode - The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.

- Update mode - (Available since Spark 2.1.1) Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. More information to be added in future releases.



### Output Sinks

:::: {.columns}

::: {.fragment .column width="50%"}
**File sink** - Stores the output to a directory.
```scala
writeStream // can be "orc", "json", "csv", etc.
.format("parquet")        
.option("path", "path/to/destination/dir")
.start()
```
::: 

::: {.fragment .column width="50%"}
**Kafka sink** - Stores the output to one or more topics in Kafka.
```scala
writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "updates")
.start()
```
:::

::: {.fragment .column width="50%"}
**Foreach sink** - Runs arbitrary computation on the records in the output. See later in the section for more details.
```scala
writeStream
.foreach(...)
.start()
```
:::

::: {.fragment .column width="50%"}
**Console sink** (for debugging) - Prints the output to the console/stdout every time there is a trigger.
```scala
writeStream
    .format("console")
    .start()
```
:::
::::

::: {.callout-note}
call start() to actually start the execution of the query, returns a StreamingQuery object which is a handle to the continuously running execution and usable to manage the query
:::


#### For each/for each batch

:::: {.columns}

::: {.fragment .column width="50%"}
- ```foreachBatch(...)``` defines a function that is executed on the output data of every micro-batch of a streaming query. 
- function takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.

::: {.fragment}
```python
def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    pass
streamingDF.writeStream.foreachBatch(foreach_batch_function)
.start()
```
:::

::: {.callout-note .fragment}
If foreachBatch is not an option (for example, corresponding batch data writer does not exist, or continuous processing mode), then you can express your custom writer logic using foreach. 

:::

::: 

::: {.fragment .column width="50%"}
1. **function**  simple way to express  processing logic, don't allow to deduplicate generated data when failures cause reprocessing of some input data.
   
::: {.fragment}
```python
def process_row(row):
    # Write row to storage
    pass
query = streamingDF.writeStream.foreach(process_row).start()
```
:::
2. **object** can define the methods open, process and close

::: {.fragment}
```python
class ForeachWriter:
    def open(self, partition_id, epoch_id):
        # Open connection. This method is optional in Python.
        pass

    def process(self, row):
        # Write row to connection. This method is NOT optional in Python.
        pass

    def close(self, error):
        # Close the connection. This method in optional in Python.
        pass
      
query = streamingDF.writeStream.foreach(ForeachWriter()).start()
```
:::
:::
::::

#### Streaming Table APIs
Since Spark 3.1, you can also use DataStreamReader.table() to read tables as streaming DataFrames and use DataStreamWriter.toTable() to write streaming DataFrames as tables:

```python
spark = ...  # spark session

# Create a streaming DataFrame
df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# Write the streaming DataFrame to a table
df.writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .toTable("myTable")

# Check the table result
spark.read.table("myTable").show()

# Transform the source dataset and write to a new table
spark.readStream \
    .table("myTable") \
    .select("value") \
    .writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .format("parquet") \
    .toTable("newTable")

# Check the new table result
spark.read.table("newTable").show()
```

Not covered

- triggers
- managing streaming queries
- Monitoring Streaming Queries
- Reporting Metrics programmatically using Asynchronous APIs
- Recovering from Failures with Checkpointing
- Recovery Semantics after Changes in a Streaming Query
- Continuous Processing
