# Unit 8 Spark Streaming

## Contents
```
8.1. Introduction to Stream Processing with Spark
  8.1.1. Spark Streaming API (DStream)
  8.1.2. Structured Streaming API
  8.1.3. Stream Processing Model
  8.1.4. Input Sources
  8.1.5. Output Sinks
  8.1.6. Output Mode
  8.1.7. Fault Tolerance and Restarts
  8.1.8. Typical structure of a Spark Streaming application
  
8.2. Windowing and Aggregates
  8.2.1. Stateless vs Stateful Transformations
  8.2.2. Event Time and Windowing    
  8.2.3. Tumbling Window
  8.2.4. Sliding Window
  8.2.5. Watermarking
  
8.3. Joins
  8.3.1. Stream-Static Joins
  8.3.2. Stream-Stream Joins
  
```

## 8.1. Introduction to Stream Processing with Spark

Let's start reviewing how Spark operates in the standard batch processing mode:

![Standard batch processing operation](https://bigdata.cesga.es/img/spark_streaming-non_streaming_operation.png)

In batch mode, we have a input data source, we apply some transformations and we write the output to the given storage.

When procesing streaming data source we have to introduce a new axis, **time**, because in this case the input source is constantly generating new input data as time evolves.

![Microbatches](https://bigdata.cesga.es/img/spark_streaming-microbatch_generation.png)
In stream processing mode Spark divides the input data stream in micro-batches and then each micro-batch is processed in a series of small jobs.

## 8.1.1. Spark Streaming API (DStream)

The Spark Streaming API, aka DStream, is the implementation of Spark Streaming based on RDDs. You can find it in legacy projects but for new projects the newer Structured Streaming API is recommended.

NOTE: There are no longer updates to Spark Streaming.

## 8.1.2. Structured Streaming API

The Structured Streaming API is the new streaming API that uses the Spark SQL engine, ie. the DataFrame API.

Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

The idea behind both Spark Streaming and Structured Streaming is to divide the stream of data into **micro-batches** and each micro-batch its processed as a small job, achieving end-to-end latencies as low as 100 milliseconds.

To achive lower latencies, there is also a low-latency processing mode called **Continuous Processing** which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees.

The Spark SQL engine (Catalyst) takes care of running the series of jobs incrementally and continuously updating the final result as streaming data continues to arrive.

The system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs.

## 8.1.3. Stream Processing Model

The key idea behind spark structured streaming is to treat the live data stream as a table that is being continously appended.

![Unbounded table](https://bigdata.cesga.es/img/spark_streaming-unbounded_table.png)

## 8.1.4. Input Sources

- Socket source (for testing)

In [None]:
spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

- Rate source (for testing and benchmarking)

In [None]:
spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 2) \
    .load()

You can also specify the number of partitions to simulate:

In [None]:
spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 2) \
    .option("numPartitions", 2) \
    .load()

- File source

In [None]:
spark.readStream \
  .format("json") \
  .option("path", "path/to/source/dir") \
  .load()

format can be: parquet, json, csv, orc, etc.

- Kafka source

In [None]:
spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()

We can choose where to start in the stream with the `startingOffsets` option:
```python
.option("startingOffsets", "earliest")
```

`earliest` will start from the earliest offsets of the topics partitions, `latest` will start from the latest offsets.

We can also use a json string specifying a starting offset for each TopicPartition, in this case you can use `-2` to refer earliest and `-1` to refer to latest.
```python
.option("startingOffsets", """{"topic1": {"0": 100, "1": -2}, "topic2": {"0": -2}}""")
```

This only applies when a new query is started, resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

## Exercises

Lab 1: [Unit_8_structured_streaming-dataframe_schema.py](exercises/Unit_8_structured_streaming-dataframe_schema.py)
- Review the code
- Run the app using:
```
    spark-submit Unit_8_structured_streaming-dataframe_schema.py bigdata.cesga.es 80
```    
- What is the schema of the dataframe that is generated from the stream?

Lab 2: [Unit_8_input_source_rate.py](exercises/Unit_8_input_source_rate.py)
- Check how the rate input source works. This source is very useful for testing.
- Run the app using:
```
    spark-submit Unit_8_input_source_rate.py
```   
- Experiment with the rowsPerSecond and numPartitions options.

Lab 3: [Unit_8_input_source_json_files.py](exercises/Unit_8_input_source_json_files.py)
- Review the code
- Create the news directory and upload news1.json file
```
hdfs dfs -mkdir news
hdfs dfs -put news1.json news
```
- Run the app using
```
spark-submit Unit_8_input_source_json_files.py news
```
- Keep the app running and upload a new json file to the same directory
```
dfs dfs -put news2.json news
```
- Keep the app running and upload a final json file to the same directory
```
dfs dfs -put news3.json news
```

## 8.1.5. Output Sinks

- File sink: stores the output to a directory

In [None]:
df.writeStream \
    .format("parquet") \
    .option("path", "path/to/destination/dir") \
    .start()

format can be parquet, json, csv, orc, etc.

- Kafka sink: stores the output to one or more topics in Kafka

In [None]:
df.writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "orders")
    .start()

The Dataframe being written to Kafka should have the following columns in schema:
- `key` (optional) of type string or binary
- `value` (required) of type string or binary
- `topic` (optional) of type string

The `value column` is the only required option.

The `topic column` is required if the `topic configuration option` is not specified. If a topic column exists then its value is used as the topic when writing the given row to Kafka. In case both the topic column and the topic option are both specified the topic configuration option overrides the topic column.

If a `key column` is not specified then a `null` valued key column will be automatically added.  


In [None]:
kafka_df = products.selectExpr(
    "cast(order_id as string) as key",
    """to_json(
        named_struct(
            'order_id', order_id,
            'product_id', product_id,
            'count', count
        )
    ) as value""")

- Console sink (for debugging): prints the output to stdout every time there is a trigger

In [None]:
df.writeStream \
    .format("console") \
    .option("truncate", "false") \
    .start()

the `truncate` option allows to control if the output is truncated in case the content of a given cell is too long (`true` by default).

- Memory sink (for debugging):  stores the output in the memory of the driver

In [None]:
df.writeStream \
    .format("memory") \
    .queryName("mytable") \
    .start()

we can then interactively query the "mytable" dataframe.

- ForeachBatch: runs custom write logic on every micro-batch of the output

In [None]:
def foreach_batch_function(df, epoch_id):
    # Custom function that transforms and writes df to storage
    pass
  
df.writeStream \
    .foreachBatch(foreach_batch_function) \
    .start()

- Foreach: runs custom write logic on every row of the output

In [None]:
def process_row(row):
    # Custom function that writes row to storage
    pass
    
df.writeStream \
    .foreach(process_row) \
    .start()

## 8.1.6. Output mode

- *Append Mode*: only the new rows will be written.

In [None]:
df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

- *Update Mode*: only new and updated rows will be written.

In [None]:
df.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

- *Complete Mode*: the entire updated result table will be written.

In [None]:
df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

Complete mode is only supported if there are streaming aggregations as it would be infeasible to keep all unaggregated data in the result dataframe.

In complete mode all aggregate data has to be preserved so you must be careful on how much the result dataframe is growing.

## 8.1.7. Fault Tolerance and Restarts

Stream source operation modes:
- **At most once**: a row of data is delivered to the application at most once. It could happen that is not delivered so data could be lost.
- **At least once**: a row of data is delivered to the application at least once. No data is ever lost but it could be delivered several times, so data can be duplicated. 
- **Exactly once**: a row of data is guaranteed to be delivered exactly once. No data loss, no duplicated records.

Restarts:
- The ability to restart a streaming app from where it left

**To achieve fault tolerance and restarts Spark depends on the underlying input source functionality.** The streaming source should support offsets (like Kafka) so the position in the stream can be tracked and set. Spark uses checkpointing and write ahead logs to record the offsets of the stream beaing processed in each trigger.

The output sinks in Spark Structured Streaming are designed to be idempotent, so they can handle reprocessing correctly.

To create a streaming app with Spark Structured Streaming that can achive **exactly-once** semantics we will have to:
- Use a replayable source (like Kafka)
- Use checkpointing
- Use deterministic computation (same input -> same output, no randomness or processing time dependency)
- Use idempotent sink (can handle duplicates)


## 8.1.8. Typical structure of a Spark Streaming application

In general spark streaming applications have the following structure:
1. Read from the streaming source into a input dataframe (input source)
2. Process the input dataframe and transform it in the output dataframe
3. Write the output dataframe (output sink)

## Exercises

First let's start with two labs to see how we can use the "rate" source and "memory" sink to work interactively:
- Lab 4: [Unit_8_input_source_rate_output_memory.py](exercises/Unit_8_input_source_rate_output_memory.py) (non interactive: submit with "spark-submit")
- Lab 5: [Unit_8_interactive_streaming.ipynb](exercises/Unit_8_interactive_streaming.ipynb) (interactive)

Then we can create a basic streaming app with the "socket" source so we can test the different output modes:
- Lab 6: [Unit_8_structured_streaming_basics.ipynb](exercises/Unit_8_structured_streaming_basics.ipynb)

Finally let's try to implement word count in a streaming app:
- Lab 7: [Unit_8_socket_wordcount.ipynb](exercises/Unit_8_socket_wordcount.ipynb)

## 8.2. Windowing and Aggregates

### 8.2.1. Stateless vs Stateful Transformations

Stateless transformations (complete output mode is not supported):
- select
- filter
- map
- flatMap
- explode

Stateful transformations (excessive state can lead to of memory errors):
- Grouping
- Aggregations
- Windowing
- Joins

### 8.2.2. Event Time and Windowing    

Concepts:
- Event time
- Trigger time
- Windowing: tumbling window, sliding window

### 8.2.3. Tumbling Window

![Tumbling Window](http://bigdata.cesga.es/img/spark_streaming_tumbling_window.png)

In [None]:
tumbling_window = df.groupBy(
    window(col("created_at"), "10 minutes")
).count()

### 8.2.4. Sliding Window

![Sliding Window](http://bigdata.cesga.es/img/spark_streaming_sliding_window.png)

In [None]:
sliding_window = df.groupBy(
    window(col("created_at"), "10 minutes", "5 minutes")
).count()

### 8.2.5. Watermarking

Watermarking allows spark to clean up the **state store** discarding old data. 

To define a watermark for a query we specify the event time column (it must be the same that we will use in the groupBy) and the threshold on how late the data is expected to be in terms of event time (late data outside the watermark will be discarded).

In [None]:
sliding_window_with_watermark = df \
    .withWatermark("created_at", "30 minutes") \
    .groupBy(window(col("created_at"), "10 minutes", "5 minutes")) \
    .count()

## 8.3. Joins

### 8.3.1. Stream-Static Joins

In [None]:
streaming_df.join(static_df, expr(join_expr), join_type) 

### 8.3.2. Stream-Stream Joins

In [None]:
joined_stream = stream_1.join(stream_2, expr(join_expr), join_type)

## Submitting in local mode

```
spark-submit --master local[3] --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 Unit_8_kafka.py
```

## Useful functions

### to_timestamp

Converts a Column into pyspark.sql.types.TimestampType using the optionally specified format. Specify formats according to datetime pattern.

In [15]:
from pyspark.sql import Row

df = spark.createDataFrame([Row(created_at='2022-09-10 10:12:03'), Row(created_at='2022-09-22 16:42:03')])
df.show()
df.collect()

+-------------------+
|         created_at|
+-------------------+
|2022-09-10 10:12:03|
|2022-09-22 16:42:03|
+-------------------+



[Row(created_at=u'2022-09-10 10:12:03'),
 Row(created_at=u'2022-09-22 16:42:03')]

In [16]:
from pyspark.sql.functions import to_timestamp, col
df2 = df.select(to_timestamp(col('created_at'), 'yyyy-MM-dd HH:mm:ss').alias('ts'))
df2.show()
df2.collect()

+-------------------+
|                 ts|
+-------------------+
|2022-09-10 10:12:03|
|2022-09-22 16:42:03|
+-------------------+



[Row(ts=datetime.datetime(2022, 9, 10, 10, 12, 3)),
 Row(ts=datetime.datetime(2022, 9, 22, 16, 42, 3))]

We can also use the `cast` method of a `Column`, but in this case we can not indicate the time format:

In [17]:
df.select(col('created_at').cast('timestamp').alias('ts')).collect()

[Row(ts=datetime.datetime(2022, 9, 10, 10, 12, 3)),
 Row(ts=datetime.datetime(2022, 9, 22, 16, 42, 3))]

### from_json

Parses a column containing a JSON string.

In [50]:
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(key=1, value='{"order": 1, "products": [{"product": "P1", "amount": 1}, {"product": "P2", "amount": 1}], "customer": "C1"}'),
    Row(key=2, value='{"order": 2, "products": [{"product": "P3", "amount": 1}, {"product": "P2", "amount": 1}], "customer": "C2"}')
])
df.show(truncate=False)
df.collect()

+---+------------------------------------------------------------------------------------------------------------+
|key|value                                                                                                       |
+---+------------------------------------------------------------------------------------------------------------+
|1  |{"order": 1, "products": [{"product": "P1", "amount": 1}, {"product": "P2", "amount": 1}], "customer": "C1"}|
|2  |{"order": 2, "products": [{"product": "P3", "amount": 1}, {"product": "P2", "amount": 1}], "customer": "C2"}|
+---+------------------------------------------------------------------------------------------------------------+



[Row(key=1, value=u'{"order": 1, "products": [{"product": "P1", "amount": 1}, {"product": "P2", "amount": 1}], "customer": "C1"}'),
 Row(key=2, value=u'{"order": 2, "products": [{"product": "P3", "amount": 1}, {"product": "P2", "amount": 1}], "customer": "C2"}')]

In [49]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, IntegerType, LongType

schema = StructType([
    StructField("order", LongType()),
    StructField("products", 
        ArrayType(
            StructType([
                StructField("product", StringType()),
                StructField("amount", IntegerType())
            ])
        )
    ),
    StructField("customer", StringType())
])
    
               
df2 = df.select(from_json(col('value').cast('string'), schema).alias('converted'))
df2.show(truncate=False)
df2.collect()

+---------------------------+
|converted                  |
+---------------------------+
|[1, [[P1, 1], [P2, 1]], C1]|
|[2, [[P3, 1], [P2, 1]], C2]|
+---------------------------+



[Row(converted=Row(order=1, products=[Row(product=u'P1', amount=1), Row(product=u'P2', amount=1)], customer=u'C1')),
 Row(converted=Row(order=2, products=[Row(product=u'P3', amount=1), Row(product=u'P2', amount=1)], customer=u'C2'))]

Some quick rules of thump for schema definition:
```
    {} -> StructType
    [] -> ArrayType
    123 -> IntegerType, LongType
    12.24 -> FloatType, DoubleType
    text -> StringType
    True -> BooleanType
```

It is useful to import the `types` module as an alias so then we have autocompletion to look for the types:

In [36]:
import pyspark.sql.types as T

### to_json

Converts a column containing a StructType, ArrayType or a MapType into a JSON string.

In [27]:
from pyspark.sql import Row

df = spark.createDataFrame([Row(key=1, value=Row(created_at="2022-09-10 16:00:01", order_id=1)),
                            Row(key=2, value=Row(created_at="2022-09-10 16:00:01", order_id=2))])
df.show(truncate=False)
df.collect()

+---+------------------------+
|key|value                   |
+---+------------------------+
|1  |[2022-09-10 16:00:01, 1]|
|2  |[2022-09-10 16:00:01, 2]|
+---+------------------------+



[Row(key=1, value=Row(created_at=u'2022-09-10 16:00:01', order_id=1)),
 Row(key=2, value=Row(created_at=u'2022-09-10 16:00:01', order_id=2))]

In [28]:
from pyspark.sql.functions import to_json

df2 = df.select(to_json(col('value')).alias('json'))
df2.show(truncate=False)
df2.collect()

+-------------------------------------------------+
|json                                             |
+-------------------------------------------------+
|{"created_at":"2022-09-10 16:00:01","order_id":1}|
|{"created_at":"2022-09-10 16:00:01","order_id":2}|
+-------------------------------------------------+



[Row(json=u'{"created_at":"2022-09-10 16:00:01","order_id":1}'),
 Row(json=u'{"created_at":"2022-09-10 16:00:01","order_id":2}')]

## Summary of Streaming Concepts

- Input Sources
- Output Sinks
- Output mode
- Streaming Query
- Schema
- Triggers
- Checkpointing
- Fault tolerance and exactly once processing

## Exercices

- Lab Unit_8_tumbling_window.ipynb
- Lab Unit_8_sliding_window.ipynb
- Lab Processing Orders:
  - Unit_8_processing_orders_lab.py
  - Unit_8_orders_producer_kafka-python.py
- Lab Twitter
  - Unit_8_twitter_developing_the_app.ipynb
  - Unit_8_twitter_to_kafka.py
  - Unit_8_twitter_sentiment_analysis.py
  - Unit_8_twitter_sentiment_analysis_alternative_implementation_with_textblob.py

## Spark Streaming API (DStream)

The Spark Streaming API (DStream) is the implementation of Spark Streaming based on RDDs. It now longer receives updates but you can find it in existing projects.

In this lab we will see how it has been used in a real-life use case to detect SSH brute-force attacks in real-time:

- Lab Structured Streaming (DStream): Review the code of a production app using the legacy API
  - Unit_8_ssh_attack_detector-dstream_app.py
  - Unit_8_ssh_attack_detector-submit_script.sh

## Learning More

- DStream: [Spark Streaming Programming Guide (legacy)](https://spark.apache.org/docs/latest/streaming-programming-guide.html)
- [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Structured Streaming + Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)
- [Real-Time Stream Processing Using Apache Spark 3 for Python Developers](https://www.packtpub.com/product/real-time-stream-processing-using-apache-spark-3-for-python-developers-video/9781803246543)
- [Beginning Apache Spark 3: With DataFrame, Spark SQL, Structured Streaming, and Spark Machine Learning Library](https://www.amazon.es/Beginning-Apache-Spark-DataFrame-Structured/dp/1484273826)
- [Spark in Action, Second Edition](https://www.manning.com/books/spark-in-action-second-edition)