## Structured Streaming
- A scalable and fault-tolerant stream processing engine 
- Introduced from Spark 2.3 release onwards
- Built on Spark SQL library,  based on Dataframe and Dataset APIs
- Can easily apply any SQL query (using DataFrame API) 


### Working 

- Structured Streaming works on the same architecture of polling the data after some duration, based on your trigger interval
- In Structured streaming, there is no concept of a batch

- The received data in a trigger is appended to the continuously flowing data stream, Each row of the data stream is processed and the result is updated into the unbounded result table.

- Structured Streaming uses Dataframe and Dataset APIs to perform streaming operations.

- DataFrames are more optimized in terms of processing and provides more options of aggregations and other operations with a variety of functions available.


### Advantages 

- Compatible with event-time data processing.

- Structured streaming provides the functionality to process the data on the basis of event-time when the timestamp of the event is included in the data received and prevent data loss if older data arrives late.

- Other than checkpointing, Structured streaming has applied two conditions to recover from any error

   1.  The source must be replayable
   2. The Sinks must support idempotent operations to support reprocessing in case of failures.
   
- 




In [1]:
# import the necessary classes and create a local SparkSession

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()



In [2]:
# create a streaming DataFrame that represents text data received from a server listening on localhost:9999,

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

In [1]:
 
# Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# to run open terminal and enter command 
#nc -lk 9999

# query.awaitTermination()

## Programming Model

- key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended
- Basically, in this so called  stream processing model, You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table

### Basic concepts 
- Treat a live data stream as a table that is being continuously appended.
- Spark runs it as an incremental query on this unbounded input table.
- query on the input will generate the “Result Table”. Every trigger interval 

- “Output” is defined as what gets written out to the external storage and can be defined in a different mode:
    - <B>Complete Mode</B> - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.
    
    - <b>Append Mode</b> - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
    
    - <b>Update Mode </b> - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

<B>Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).</B>

### Handling Event-time and Late Data
- One Advantage of Spark straming is its  handling of late event data 
- Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data, and allows the engine to accordingly clean up old state.
- It  has full control over updating old aggregates 

### Fault Tolerance Semantics

The engine uses checkpointing and write-ahead logs to record the offset range of the data being processed in each trigger.

- Computation does not start from scrach 
- Streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream
- engine uses checkpointing and write-ahead logs to record the offset range of the data being processed in each trigger.
- streaming sinks are designed to be  handle reprocessing for multiple identical request
- Structured Streaming can ensure end-to-end exactly-once semantics under any failure.

### Programing
- Use the common entry point SparkSession (Scala/Java/Python/R docs) to create streaming.
- Streaming DataFrames can be created through the DataStreamReader interface returned by SparkSession.readStream()
- Also, you can specify the details of the source – data format, schema, options


### Input Sources

#### File source   
 - Reads files written in a directory as a stream of data.
 - Files will be processed in the order of file modification time (iflatestFirst is set, order will be reversed)
 - Supported file formats are text, CSV, JSON, ORC, Parquet

#### Kafka source - 
   - It’s compatible with Kafka broker versions 0.10.0 or higher

#### Socket source (for testing) - 
   -   Reads UTF8 text data from a socket connection.
   - Should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.

#### Rate source (for testing) - 
  - ata at the specified number of rows per second, each output row contains a timestamp (time of message dispatch) and value (message count 0 for row 1 ).


In [None]:
# create sparksession
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

# Read text from socket
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources

socketDF.printSchema()

# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")



you can also create streaming DataFrames from tables with DataStreamReader.table()

### Schema inference and partition of streaming DataFrames/Datasets

- By deafult schema is should to be specified
- The directories that make up the partitioning scheme must be present when the query starts and must remain static

-  SQL-like operations and yped RDD-like operations (e.g. map, filter, flatMap) both can be applied.


In [None]:
# Select the devices which have signal more than 10
df.select("device").where("signal > 10")

# Running count of the number of updates for each device type
df.groupBy("deviceType").count()

# register a streaming DataFrame/Dataset as a temporary view and then apply SQL commands 
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  # returns another streaming DF

# identify whether a DataFrame/Dataset has streaming data or not by using df.isStreaming.
df.isStreaming()

# Window Operations on Event Time


### Programing advises
- check the query plan of the query,
- if Once stateful operations are injected in the query plan, you may need to check your query with considerations in stateful operations. (e.g. output mode, watermark, state store size maintenance, etc.)

### Window Operations on Event Time
- Since this windowing is similar to grouping, in code, you can use groupBy() and window() operations to express windowed aggregations. You can see the full code for the below examples in Scala/Java/Python.
- Use `window` as In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into

In [None]:
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

### Handling Late Data and Watermarking

- Structured Streaming can maintain the intermediate state for partial aggregates for a long period of time such that late data can update aggregates of old windows correctly

- However, system has to bound the amount of intermediate in-memory state it accumulates.

- System needs to know when an old aggregate can be dropped from the in-memory state because the application is not going to receive late data for that aggregate any more.

- To enable this, in Spark 2.1, we have introduced watermarking, which lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly.
- , late data within the threshold will be aggregated, but data later than the threshold will start getting dropped (

- define the watermark of a query by specifying the event time column and the threshold on how late the data is expected to be in terms of event time. For a specific window ending at time T, the engine will maintain state and allow late data to update the state until (max event time seen by the engine - late threshold > T)

In [None]:
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()



#### Following conditions must be satisfied for the watermarking to clean the state in aggregation queries

- Output mode must be Append or Update.

- The aggregation must have either the event-time column, or a window on the event-time column.

- withWatermark() must be called on the same column as the timestamp column used in the aggregate

- withWatermark must be called before the aggregation for the watermark details to be used.

- A watermark delay (set with withWatermark) of “2 hours” guarantees that the engine will never drop any data that is less than 2 hours delayed. Data delayed by more than 2 hours is not guaranteed to be dropped; it may or may not get aggregated

### Join Operations
- Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame as well as another streaming Dataset/DataFrame

In [None]:
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  # left outer join with a static DF



#### Stream-stream Joins :
    for both the input streams, we buffer past input as streaming state, so that we can match every future input with past input and accordingly generate joined results.
    
##### types of supported stream-stream joins

 - Inner Joins with optional Watermarking
 


In [None]:


from pyspark.sql.functions import expr

impressions = spark.readStream. ...
clicks = spark.readStream. ...

# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

# Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)



Outer Joins with Watermarking

In [None]:


impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)



Semi Joins with Watermarking

### Streaming Deduplication (eliminate duplicate or redundant information)
- can deduplicate records in data streams using a unique identifier in the events
- can use deduplication with or without watermarking.
    - With watermark : If there is an upper bound on how late a duplicate record may arrive, then you can define a watermark on an event time column and deduplicate using both the guid and the event time columns. query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more
    - Without watermark  :Since there are no bounds on when a duplicate record may arrive, the query stores the data from all the past records as state.

In [None]:

streamingDf = spark.readStream. ...

# Without watermark using guid column
streamingDf.dropDuplicates("guid")

# With watermark using guid and eventTime columns
streamingDf \
  .withWatermark("eventTime", "10 seconds") \
  .dropDuplicates("guid", "eventTime")



### Policy for handling multiple watermarks
- treaming query can have multiple input streams that are unioned or joined together,Each can have a different threshold of late data that needs to be tolerated for stateful operations.You specify these thresholds using withWatermarks ("eventTime", delay) on each of the input streams.

- If you are to track sessions from data streams of events,you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since

### Unsupported Operations
There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.

- Multiple streaming aggregations
- Limit and take the first N rows are not supported on streaming Datasets.
- Distinct operations on streaming Datasets are not supported.
- Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.


Following Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset.u will see an AnalysisException like “operation XYZ is not supported with streaming 

   -  count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.

   - foreach() - Instead use ds.writeStream.foreach(...) (see next section).

   - show() - Instead use the console sink (see next section).


### Limitation of global watermark

- 

### Starting Streaming Queries

In [None]:


# ========== DF with no aggregations ==========
noAggDF = deviceDataDf.select("device").where("signal > 10")   

# Print new data to console
noAggDF \
    .writeStream \
    .format("console") \
    .start()

# Write new data to Parquet files
noAggDF \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .option("path", "path/to/destination/dir") \
    .start()

# ========== DF with aggregation ==========
aggDF = df.groupBy("device").count()

# Print updated aggregations to console
aggDF \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Have all the aggregates in an in-memory table. The query name will be the table name
aggDF \
    .writeStream \
    .queryName("aggregates") \
    .outputMode("complete") \
    .format("memory") \
    .start()

spark.sql("select * from aggregates").show()   # interactively query in-memory table



### Streaming Table APIs

In [None]:


spark = ...  # spark session

# Create a streaming DataFrame
df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# Write the streaming DataFrame to a table
df.writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .toTable("myTable")

# Check the table result
spark.read.table("myTable").show()

# Transform the source dataset and write to a new table
spark.readStream \
    .table("myTable") \
    .select("value") \
    .writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .format("parquet") \
    .toTable("newTable")

# Check the new table result
spark.read.table("newTable").show()



### Triggers
- trigger settings of a streaming query define the timing of streaming data processing,
- whether the query is going to be executed as micro-batch query with a fixed batch interval or as a continuous processing query

In [None]:


# Default trigger (runs micro-batch as soon as it can)
df.writeStream \
  .format("console") \
  .start()

# ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream \
  .format("console") \
  .trigger(processingTime='2 seconds') \
  .start()

# One-time trigger
df.writeStream \
  .format("console") \
  .trigger(once=True) \
  .start()

# Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(continuous='1 second')
  .start()



### Managing Streaming Queries
- The StreamingQuery object created when a query is started can be used to monitor and manage the query.



In [None]:


query = df.writeStream.format("console").start()   # get the query object

query.id()          # get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId()       # get the unique id of this run of the query, which will be generated at every start/restart

query.name()        # get the name of the auto-generated or user-specified name

query.explain()   # print detailed explanations of the query

query.stop()      # stop the query

query.awaitTermination()   # block until query is terminated, with stop() or with error

query.exception()       # the exception if the query has been terminated with error

query.recentProgress()  # an array of the most recent progress updates for this query

query.lastProgress()    # the most recent progress update of this streaming query



You can start any number of queries in a single SparkSession. They will all be running concurrently sharing the cluster resources.

In [None]:


spark = ...  # spark session

spark.streams.active  # get the list of currently active streaming queries

spark.streams.get(id)  # get a query object by its unique id

spark.streams.awaitAnyTermination()  # block until any one of them terminates



### Monitoring Streaming Queries
- There are multiple ways to monitor active streaming queries. You can either push metrics to external systems using Spark’s Dropwizard Metrics support, or access them programmatically.

####  Reading Metrics Interactively

In [None]:

query = ...  # a StreamingQuery
print(query.lastProgress)

'''
Will print something like the following.

{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
'''

print(query.status)
''' 
Will print something like the following.

{u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
'''




### Reporting Metrics programmatically

- You can also asynchronously monitor all queries associated with a SparkSession by attaching a StreamingQueryListener

- Not available in Python.

#### Reporting Metrics using Dropwizard
- Spark supports reporting metrics using the Dropwizard Library. To enable metrics of Structured Streaming queries to be reported as well, you have to explicitly enable the configuration spark.sql.streaming.metricsEnabled in the SparkSession

In [None]:
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
# or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")



### Recovering from Failures with Checkpointing

In [None]:


aggDF \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .format("memory") \
    .start()



### Recovery Semantics after Changes in a Streaming Query
Here are a few kinds of changes that are either not allowed, or the effect of the change is not well-defined

- Changes in the number or type (i.e. different source) of input sources:
- Changes in the parameters of input sources: 
- Changes in the type of output sink: 
- Changes in the parameters of output sink:
- Changes in projection / filter / map-like operations
- Changes in stateful operations



### Continuous Processing

- Continuous processing is a new, experimental streaming execution mode introduced in Spark 2.3 that enables low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees.
- Remember,default micro-batch processing engine which can achieve exactly-once guarantees but achieve latencies of ~100ms at best
- To run a supported query in continuous processing mode, all you need to do is specify a continuous trigger with the desired checkpoint interval as a parameter


In [None]:


spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load() \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .trigger(continuous="1 second") \     # only change in query
  .start()



### Supported Queries

only the following type of queries are supported in the continuous processing mode.
    
   - Operations: 
       - Only map-like Dataset/DataFrame operations are supported in continuous mode, that is
       - only projections (select, map, flatMap, mapPartitions, etc.)
       - and selections (where, filter, etc.)
    
   - Sources:
       - Kafka source: All options are supported.
       - Rate source: Good for testing. Only options that are supported in the continuous mode are numPartitions and rowsPerSecond.
       
   - Sinks: 
       - Kafka sink: All options are supported.
       - Memory sink: Good for debugging.
       - Console sink: Good for debugging. All options are supported. Note that the console will print every checkpoint interval that you have specified in the continuous trigger.
       



### Caveats (usage warnings)

- Before starting a continuous processing query, you must ensure there are enough cores in the cluster to all the tasks in parallel.

- There are currently no automatic retries of failed tasks

- Some usefull configurations to know 
    - spark.sql.shuffle.partitions
    - spark.sql.streaming.stateStore.providerClass
    - spark.sql.streaming.multipleWatermarkPolicy