Skip to content

Commit

Permalink
StreamingAggregationStateManagers
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Nov 26, 2018
1 parent 0bff451 commit bd0b84c
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 52 deletions.
20 changes: 11 additions & 9 deletions SUMMARY.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

. link:spark-sql-streaming-StreamingQueryManager.adoc[StreamingQueryManager -- Streaming Query Management]

. link:spark-sql-streaming-SQLConf.adoc[SQLConf -- Internal Configuration Store]
. link:spark-sql-streaming-properties.adoc[Configuration Properties]

=== Kafka Data Source
Expand Down Expand Up @@ -163,7 +164,7 @@

. link:spark-sql-streaming-OffsetSeqMetadata.adoc[OffsetSeqMetadata]

== Managing State in Stateful Streaming Aggregations
== Managing State in Stateful Stream Processing

. link:spark-sql-streaming-StateStore.adoc[StateStore Contract -- Kay-Value Store for State Management]
.. link:spark-sql-streaming-StateStoreOps.adoc[StateStoreOps -- Implicits Methods for Creating StateStoreRDD]
Expand All @@ -179,11 +180,18 @@
. link:spark-sql-streaming-StateStoreCoordinator.adoc[StateStoreCoordinator -- Tracking Locations of StateStores for StateStoreRDD]
.. link:spark-sql-streaming-StateStoreCoordinatorRef.adoc[StateStoreCoordinatorRef Interface for Communication with StateStoreCoordinator]

. link:spark-sql-streaming-StreamingAggregationStateManager.adoc[StreamingAggregationStateManager Contract -- State Managers for Streaming Aggregation]
.. link:spark-sql-streaming-StreamingAggregationStateManagerBaseImpl.adoc[StreamingAggregationStateManagerBaseImpl -- Base StreamingAggregationStateManager]
.. link:spark-sql-streaming-StreamingAggregationStateManagerImplV1.adoc[StreamingAggregationStateManagerImplV1 -- Legacy State Manager for Streaming Aggregations]
.. link:spark-sql-streaming-StreamingAggregationStateManagerImplV2.adoc[StreamingAggregationStateManagerImplV2 -- Default State Manager for Streaming Aggregations]

== Monitoring

. link:spark-sql-streaming-StreamingQueryListener.adoc[StreamingQueryListener -- Intercepting Streaming Events]
.. link:spark-sql-streaming-StreamingQueryProgress.adoc[StreamingQueryProgress]

. link:spark-sql-streaming-StreamProgress.adoc[StreamProgress Custom Scala Map]

. link:spark-sql-streaming-webui.adoc[Web UI]

. link:spark-sql-streaming-logging.adoc[Logging]
Expand All @@ -192,11 +200,11 @@

. link:spark-sql-streaming-DataSource.adoc[DataSource -- Pluggable Data Source]

. link:spark-sql-streaming-StreamSourceProvider.adoc[StreamSourceProvider -- Streaming Data Source Provider]
. link:spark-sql-streaming-Source.adoc[Streaming Source]
.. link:spark-sql-streaming-StreamSourceProvider.adoc[StreamSourceProvider -- Streaming Data Source Provider]

. link:spark-sql-streaming-StreamSinkProvider.adoc[StreamSinkProvider]
. link:spark-sql-streaming-Sink.adoc[Streaming Sink -- Adding Batches of Data to Storage]
.. link:spark-sql-streaming-StreamSinkProvider.adoc[StreamSinkProvider]

== Demos

Expand All @@ -208,9 +216,3 @@
== Varia

. link:spark-sql-streaming-UnsupportedOperationChecker.adoc[UnsupportedOperationChecker]
. link:spark-sql-streaming-StreamProgress.adoc[StreamProgress Custom Scala Map]

. link:spark-sql-streaming-StreamingAggregationStateManager.adoc[StreamingAggregationStateManager Contract -- State Managers for Streaming Aggregation]
.. link:spark-sql-streaming-StreamingAggregationStateManagerBaseImpl.adoc[StreamingAggregationStateManagerBaseImpl -- Base StreamingAggregationStateManager]
.. link:spark-sql-streaming-StreamingAggregationStateManagerImplV1.adoc[StreamingAggregationStateManagerImplV1]
.. link:spark-sql-streaming-StreamingAggregationStateManagerImplV2.adoc[StreamingAggregationStateManagerImplV2]
22 changes: 22 additions & 0 deletions spark-sql-streaming-SQLConf.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
== [[SQLConf]] SQLConf -- Internal Configuration Store

`SQLConf` is an *internal key-value configuration store* for <<parameters, parameters and hints>> used in Spark Structured Streaming (and Spark SQL in general).

[[accessor-methods]]
.SQLConf's Accessor Methods
[cols="1,1",options="header",width="100%"]
|===
| Name
| Description

| `STREAMING_AGGREGATION_STATE_FORMAT_VERSION`

<<spark-sql-streaming-properties.adoc#spark.sql.streaming.aggregation.stateFormatVersion, spark.sql.streaming.aggregation.stateFormatVersion>>
a| [[STREAMING_AGGREGATION_STATE_FORMAT_VERSION]]

Used when:

* <<spark-sql-streaming-StatefulAggregationStrategy.adoc#, StatefulAggregationStrategy>> execution planning strategy is executed
* `OffsetSeqMetadata` is requested for `relevantSQLConfs` and `relevantSQLConfDefaultValues`
|===
4 changes: 2 additions & 2 deletions spark-sql-streaming-StateStoreRestoreExec.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

`StateStoreRestoreExec` is a unary physical operator (i.e. `UnaryExecNode`) that link:spark-sql-streaming-StateStoreReader.adoc[restores a state from a state store] (for the keys in the input rows).

`StateStoreRestoreExec` is <<creating-instance, created>> exclusively when `StatefulAggregationStrategy` link:spark-sql-streaming-StatefulAggregationStrategy.adoc#Aggregate[plans streaming aggregate operators] (aka _streaming aggregates_).
`StateStoreRestoreExec` is <<creating-instance, created>> exclusively when <<spark-sql-streaming-StatefulAggregationStrategy.adoc#, StatefulAggregationStrategy>> execution planning strategy is executed (and plans `Aggregate` logical operators in a streaming structured query).

.StateStoreRestoreExec and StatefulAggregationStrategy
image::images/StateStoreRestoreExec-StatefulAggregationStrategy.png[align="center"]
Expand Down Expand Up @@ -121,5 +121,5 @@ NOTE: There is no way in `StateStoreRestoreExec` to find out how many rows had a
* [[keyExpressions]] Catalyst expressions for keys (as used for aggregation in link:spark-sql-streaming-Dataset-operators.adoc#groupBy[groupBy] operator)
* [[stateInfo]] link:spark-sql-streaming-StatefulOperatorStateInfo.adoc[StatefulOperatorStateInfo]
* [[stateFormatVersion]] `stateFormatVersion`
* [[stateFormatVersion]] `stateFormatVersion` (that is the value of <<spark-sql-streaming-properties.adoc#spark.sql.streaming.aggregation.stateFormatVersion, spark.sql.streaming.aggregation.stateFormatVersion>> configuration property)
* [[child]] Child physical operator
4 changes: 2 additions & 2 deletions spark-sql-streaming-StateStoreSaveExec.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

`StateStoreSaveExec` is a unary physical operator (i.e. `UnaryExecNode`) that link:spark-sql-streaming-StateStoreWriter.adoc[saves a streaming state to a state store] with link:spark-sql-streaming-WatermarkSupport.adoc[support for streaming watermark].

`StateStoreSaveExec` is <<creating-instance, created>> exclusively when `StatefulAggregationStrategy` link:spark-sql-streaming-StatefulAggregationStrategy.adoc#Aggregate[plans streaming aggregate operators] (aka _streaming aggregates_).
`StateStoreSaveExec` is <<creating-instance, created>> exclusively when <<spark-sql-streaming-StatefulAggregationStrategy.adoc#, StatefulAggregationStrategy>> execution planning strategy is executed (and plans `Aggregate` logical operators in a streaming structured query).

.StateStoreSaveExec and StatefulAggregationStrategy
image::images/StateStoreSaveExec-StatefulAggregationStrategy.png[align="center"]
Expand Down Expand Up @@ -329,5 +329,5 @@ Invalid output mode: [outputMode]
* [[stateInfo]] link:spark-sql-streaming-StatefulOperatorStateInfo.adoc[StatefulOperatorStateInfo]
* [[outputMode]] link:spark-sql-streaming-OutputMode.adoc[Output mode]
* [[eventTimeWatermark]] Event time watermark (as `long` number)
* [[stateFormatVersion]] `stateFormatVersion`
* [[stateFormatVersion]] `stateFormatVersion` (that is the value of <<spark-sql-streaming-properties.adoc#spark.sql.streaming.aggregation.stateFormatVersion, spark.sql.streaming.aggregation.stateFormatVersion>> configuration property)
* [[child]] Child physical operator
2 changes: 1 addition & 1 deletion spark-sql-streaming-StatefulAggregationStrategy.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
== [[StatefulAggregationStrategy]] StatefulAggregationStrategy Execution Planning Strategy for EventTimeWatermark and Aggregate Logical Operators

`StatefulAggregationStrategy` is an execution planning strategy (i.e. `Strategy`) that link:spark-sql-streaming-IncrementalExecution.adoc#planner[IncrementalExecution] uses to <<apply, plan>> `EventTimeWatermark` and `Aggregate` logical operators in streaming Datasets.
`StatefulAggregationStrategy` is an execution planning strategy (i.e. `Strategy`) that link:spark-sql-streaming-IncrementalExecution.adoc#planner[IncrementalExecution] uses to <<apply, plan>> `EventTimeWatermark` and `Aggregate` logical operators in streaming structured queries (`Datasets`).

[NOTE]
====
Expand Down
14 changes: 8 additions & 6 deletions spark-sql-streaming-StreamingAggregationStateManager.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,6 @@

`StreamingAggregationStateManager` is the <<contract, contract>> of <<implementations, state managers>> that are used in streaming aggregations.

[[supportedVersions]]
`StreamingAggregationStateManager` supports <<createStateManager, two versions of state managers for streaming aggregations>>:

* [[legacyVersion]] `1` (the legacy implementation)
* `2`
[[contract]]
.StreamingAggregationStateManager Contract
[cols="1m,2",options="header",width="100%"]
Expand Down Expand Up @@ -92,6 +86,14 @@ values(store: StateStore): Iterator[UnsafeRow]
----
|===

[[supportedVersions]]
`StreamingAggregationStateManager` supports <<createStateManager, two versions of state managers for streaming aggregations>>:

* [[legacyVersion]] `1` (legacy)
* [[default]] `2` (default)
NOTE: The version of a state manager is controlled using <<spark-sql-streaming-properties.adoc#spark.sql.streaming.aggregation.stateFormatVersion, spark.sql.streaming.aggregation.stateFormatVersion>> internal configuration property.

[[implementations]]
NOTE: <<spark-sql-streaming-StreamingAggregationStateManagerBaseImpl.adoc#, StreamingAggregationStateManagerBaseImpl>> is the one and only known base implementation of the <<contract, StreamingAggregationStateManager Contract>> in Spark Structured Streaming.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
== [[StreamingAggregationStateManagerImplV1]] StreamingAggregationStateManagerImplV1
== [[StreamingAggregationStateManagerImplV1]] StreamingAggregationStateManagerImplV1 -- Legacy State Manager for Streaming Aggregations

`StreamingAggregationStateManagerImplV1` is a <<spark-sql-streaming-StreamingAggregationStateManagerBaseImpl.adoc#, StreamingAggregationStateManagerBaseImpl>> that is used for `stateFormatVersion` being `1`.
`StreamingAggregationStateManagerImplV1` is the legacy <<spark-sql-streaming-StreamingAggregationStateManagerBaseImpl.adoc#, state manager for streaming aggregations>>.

`StreamingAggregationStateManagerImplV1` is <<creating-instance, created>> exclusively when `StreamingAggregationStateManager` is requested for a <<spark-sql-streaming-StreamingAggregationStateManager.adoc#createStateManager, new StreamingAggregationStateManager>> .
NOTE: The version of a state manager is controlled using <<spark-sql-streaming-properties.adoc#spark.sql.streaming.aggregation.stateFormatVersion, spark.sql.streaming.aggregation.stateFormatVersion>> internal configuration property.

`StreamingAggregationStateManagerImplV1` is <<creating-instance, created>> exclusively when `StreamingAggregationStateManager` is requested for a <<spark-sql-streaming-StreamingAggregationStateManager.adoc#createStateManager, new StreamingAggregationStateManager>>.

=== [[put]] Storing Row in State Store -- `put` Method

Expand Down
15 changes: 13 additions & 2 deletions spark-sql-streaming-StreamingAggregationStateManagerImplV2.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
== [[StreamingAggregationStateManagerImplV2]] StreamingAggregationStateManagerImplV2
== [[StreamingAggregationStateManagerImplV2]] StreamingAggregationStateManagerImplV2 -- Default State Manager for Streaming Aggregations

`StreamingAggregationStateManagerImplV2` is...FIXME
`StreamingAggregationStateManagerImplV2` is the default <<spark-sql-streaming-StreamingAggregationStateManagerBaseImpl.adoc#, state manager for streaming aggregations>>.

NOTE: The version of a state manager is controlled using <<spark-sql-streaming-properties.adoc#spark.sql.streaming.aggregation.stateFormatVersion, spark.sql.streaming.aggregation.stateFormatVersion>> internal configuration property.

`StreamingAggregationStateManagerImplV2` is <<creating-instance, created>> exclusively when `StreamingAggregationStateManager` is requested for a <<spark-sql-streaming-StreamingAggregationStateManager.adoc#createStateManager, new StreamingAggregationStateManager>>.

=== [[put]] Storing Row in State Store -- `put` Method

Expand All @@ -12,3 +16,10 @@ put(store: StateStore, row: UnsafeRow): Unit
NOTE: `put` is part of the <<spark-sql-streaming-StreamingAggregationStateManager.adoc#put, StreamingAggregationStateManager Contract>> to store a row in a state store.

`put`...FIXME

=== [[creating-instance]] Creating StreamingAggregationStateManagerImplV2 Instance

`StreamingAggregationStateManagerImplV2` takes the following when created:

* [[keyExpressions]] Attribute expressions for keys (`Seq[Attribute]`)
* [[inputRowAttributes]] Attribute expressions for input rows (`Seq[Attribute]`)
70 changes: 43 additions & 27 deletions spark-sql-streaming-properties.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,59 @@ val spark: SparkSession = SparkSession.builder
----

.Structured Streaming's Properties (in alphabetical order)
[cols="1,1,2",options="header",width="100%"]
[cols="1,2",options="header",width="100%"]
|===
| Name
| Default
| Name / Default Value
| Description

| [[spark.sql.streaming.checkpointLocation]] `spark.sql.streaming.checkpointLocation`
| (empty)
| Default checkpoint directory for storing checkpoint data for streaming queries
| `spark.sql.streaming.aggregation.stateFormatVersion`

| [[spark.sql.streaming.metricsEnabled]] `spark.sql.streaming.metricsEnabled`
| `false`
| Flag whether Dropwizard CodaHale metrics will be reported for active streaming queries
`2`
| [[spark.sql.streaming.aggregation.stateFormatVersion]] *(internal)* State format version used by streaming aggregation operations in a streaming query.

| [[spark.sql.streaming.minBatchesToRetain]] `spark.sql.streaming.minBatchesToRetain`
| `100`
| (internal) The minimum number of batches that must be retained and made recoverable.
Supported values: `1` or `2`

State between versions are tend to be incompatible, so state format version shouldn't be modified after running.

| `spark.sql.streaming.checkpointLocation`

(empty)
| [[spark.sql.streaming.checkpointLocation]] Default checkpoint directory for storing checkpoint data for streaming queries

| `spark.sql.streaming.metricsEnabled`

`false`
| [[spark.sql.streaming.metricsEnabled]] Flag whether Dropwizard CodaHale metrics will be reported for active streaming queries

| `spark.sql.streaming.minBatchesToRetain`

`100`
a| [[spark.sql.streaming.minBatchesToRetain]] (internal) The minimum number of batches that must be retained and made recoverable.

Used...FIXME

| [[spark.sql.streaming.numRecentProgressUpdates]] `spark.sql.streaming.numRecentProgressUpdates`
| `100`
| Number of link:spark-sql-streaming-ProgressReporter.adoc#updateProgress[progress updates to retain] for a streaming query
| `spark.sql.streaming.numRecentProgressUpdates`

`100`
| [[spark.sql.streaming.numRecentProgressUpdates]] Number of link:spark-sql-streaming-ProgressReporter.adoc#updateProgress[progress updates to retain] for a streaming query

| `spark.sql.streaming.pollingDelay`

`10`
a| [[spark.sql.streaming.pollingDelay]] *(internal)* Time delay (in ms) before `StreamExecution` link:spark-sql-streaming-MicroBatchExecution.adoc#runBatches-batchRunner-no-data[polls for new data when no data was available in a batch].

| `spark.sql.streaming.stateStore.maintenanceInterval`

`60s`
| [[spark.sql.streaming.stateStore.maintenanceInterval]] The initial delay and how often to execute StateStore's link:spark-sql-streaming-StateStore.adoc#MaintenanceTask[maintenance task].

| [[spark.sql.streaming.pollingDelay]] `spark.sql.streaming.pollingDelay`
| `10` (millis)
| (internal) Time delay before `StreamExecution` link:spark-sql-streaming-MicroBatchExecution.adoc#runBatches-batchRunner-no-data[polls for new data when no data was available in a batch].
| `spark.sql.streaming.stateStore.providerClass`

| [[spark.sql.streaming.stateStore.maintenanceInterval]] `spark.sql.streaming.stateStore.maintenanceInterval`
| `60s`
| The initial delay and how often to execute StateStore's link:spark-sql-streaming-StateStore.adoc#MaintenanceTask[maintenance task].
<<spark-sql-streaming-HDFSBackedStateStoreProvider.adoc#, org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider>>
| [[spark.sql.streaming.stateStore.providerClass]] *(internal)* The fully-qualified class name to manage state data in stateful streaming queries. This class must be a subclass of link:spark-sql-streaming-StateStoreProvider.adoc[StateStoreProvider], and must have a zero-arg constructor.

| [[spark.sql.streaming.stateStore.providerClass]] `spark.sql.streaming.stateStore.providerClass`
| link:spark-sql-streaming-HDFSBackedStateStoreProvider.adoc[org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider]
| (internal) The fully-qualified class name to manage state data in stateful streaming queries. This class must be a subclass of link:spark-sql-streaming-StateStoreProvider.adoc[StateStoreProvider], and must have a zero-arg constructor.
| `spark.sql.streaming.unsupportedOperationCheck`

| [[spark.sql.streaming.unsupportedOperationCheck]] `spark.sql.streaming.unsupportedOperationCheck`
| `true`
| (internal) When enabled (i.e. `true`), `StreamingQueryManager` link:spark-sql-streaming-UnsupportedOperationChecker.adoc#checkForStreaming[makes sure that the logical plan of a streaming query uses supported operations only].
`true`
| [[spark.sql.streaming.unsupportedOperationCheck]] *(internal)* When enabled (`true`), `StreamingQueryManager` link:spark-sql-streaming-UnsupportedOperationChecker.adoc#checkForStreaming[makes sure that the logical plan of a streaming query uses supported operations only].
|===

0 comments on commit bd0b84c

Please sign in to comment.