Skip to content

Commit

Permalink
StreamingQueryProgress
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Sep 11, 2017
1 parent a421776 commit 327161a
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 16 deletions.
1 change: 1 addition & 0 deletions SUMMARY.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
== Monitoring

. link:spark-sql-streaming-StreamingQueryListener.adoc[StreamingQueryListener -- Intercepting Streaming Events]
.. link:spark-sql-streaming-StreamingQueryProgress.adoc[StreamingQueryProgress]

. link:spark-sql-streaming-webui.adoc[Web UI]

Expand Down
12 changes: 4 additions & 8 deletions spark-sql-streaming-ProgressReporter.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ res3: java.util.Map[String,Long] = {triggerExecution=60, queryPlanning=1, getBat
| Timestamp of when the last batch/trigger started

| [[progressBuffer]] `progressBuffer`
| Scala's http://www.scala-lang.org/api/2.11.11/index.html#scala.collection.mutable.Queue[scala.collection.mutable.Queue] of <<StreamingQueryProgress, StreamingQueryProgress>>
| Scala's http://www.scala-lang.org/api/2.11.11/index.html#scala.collection.mutable.Queue[scala.collection.mutable.Queue] of link:spark-sql-streaming-StreamingQueryProgress.adoc[StreamingQueryProgress]

Elements are added and removed when `ProgressReporter` <<updateProgress, updates progress>>.

Expand All @@ -124,10 +124,6 @@ CAUTION: FIXME

CAUTION: FIXME

=== [[StreamingQueryProgress]] StreamingQueryProgress

CAUTION: FIXME

=== [[contract]] ProgressReporter Contract

[source, scala]
Expand Down Expand Up @@ -223,9 +219,9 @@ Used when:
updateProgress(newProgress: StreamingQueryProgress): Unit
----

`updateProgress` records `StreamingQueryProgress` and posts a link:spark-sql-streaming-StreamingQueryListener.adoc#QueryProgressEvent[QueryProgressEvent] event.
`updateProgress` records the input `newProgress` and posts a link:spark-sql-streaming-StreamingQueryListener.adoc#QueryProgressEvent[QueryProgressEvent] event.

.ProgressReporter's Updating Progress
.ProgressReporter's Reporting Query Progress
image::images/ProgressReporter-updateProgress.png[align="center"]

`updateProgress` adds the input `newProgress` to <<progressBuffer, progressBuffer>>.
Expand Down Expand Up @@ -295,7 +291,7 @@ DEBUG StreamExecution: Execution stats: [executionStats]

`finishTrigger` creates a <<SinkProgress, SinkProgress>> (aka sink statistics) for the <<sink, sink>>.

`finishTrigger` creates <<StreamingQueryProgress, StreamingQueryProgress>>.
`finishTrigger` creates a link:spark-sql-streaming-StreamingQueryProgress.adoc[StreamingQueryProgress].

If there was any data (using the input `hasNewData` flag), `finishTrigger` resets <<lastNoDataProgressEventTime, lastNoDataProgressEventTime>> (i.e. becomes the minimum possible time) and <<updateProgress, updates query progress>>.

Expand Down
4 changes: 2 additions & 2 deletions spark-sql-streaming-StateStoreSaveExec.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ NOTE: `StateStoreSaveExec` <<doExecute, behaves>> differently per output mode.
| number of total state rows
| Number of the state keys in the link:spark-sql-streaming-StateStore.adoc[state store]

Corresponds to `numRowsTotal` in `stateOperators` in link:spark-sql-streaming-ProgressReporter.adoc#StreamingQueryProgress[StreamingQueryProgress] (and is available as `sq.lastProgress.stateOperators(0).numRowsTotal` for ``0``th operator).
Corresponds to `numRowsTotal` in `stateOperators` in link:spark-sql-streaming-StreamingQueryProgress.adoc[StreamingQueryProgress] (and is available as `sq.lastProgress.stateOperators(0).numRowsTotal` for ``0``th operator).

| [[numUpdatedStateRows]] `numUpdatedStateRows`
| number of updated state rows
Expand All @@ -163,7 +163,7 @@ CAUTION: FIXME
CAUTION: FIXME

NOTE: You can see the current value as `numRowsUpdated` attribute in `stateOperators` in link:spark-sql-streaming-ProgressReporter.adoc#StreamingQueryProgress[StreamingQueryProgress] (that is available as `StreamingQuery.lastProgress.stateOperators(n).numRowsUpdated` for ``n``th operator).
NOTE: You can see the current value as `numRowsUpdated` attribute in `stateOperators` in link:spark-sql-streaming-StreamingQueryProgress.adoc[StreamingQueryProgress] (that is available as `StreamingQuery.lastProgress.stateOperators(n).numRowsUpdated` for ``n``th operator).

| [[stateMemory]] `stateMemory`
| memory used by state
Expand Down
10 changes: 5 additions & 5 deletions spark-sql-streaming-StreamingQuery.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ trait StreamingQuery {
| Description

| [[name]] `name`
| Query name that is unique across all active queries
| Optional query name that is unique across all active queries

| [[id]] `id`
| `UUID`
| Unique identifier of a streaming query

| [[runId]] `runId`
| `UUID`
| Unique identifier of the current execution of a streaming query

| [[sparkSession]] `sparkSession`
| `SparkSession`
Expand All @@ -72,10 +72,10 @@ trait StreamingQuery {
| `StreamingQueryStatus`

| [[recentProgress]] `recentProgress`
| Collection of recent `StreamingQueryProgress`
| Collection of recent link:spark-sql-streaming-StreamingQueryProgress.adoc[StreamingQueryProgress] updates.

| [[lastProgress]] `lastProgress`
| The last `StreamingQueryProgress`
| The last link:spark-sql-streaming-StreamingQueryProgress.adoc[StreamingQueryProgress] update.

| [[awaitTermination]] `awaitTermination`
|
Expand Down
2 changes: 1 addition & 1 deletion spark-sql-streaming-StreamingQueryListener.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ a| [[QueryStartedEvent]] `QueryStartedEvent`

a| [[QueryProgressEvent]] `QueryProgressEvent`

- `StreamingQueryProgress`
- link:spark-sql-streaming-StreamingQueryProgress.adoc[StreamingQueryProgress]
| [[onQueryProgress]] onQueryProgress
| `ProgressReporter` link:spark-sql-streaming-ProgressReporter.adoc#updateProgress[reports query progress] (which is when `StreamExecution` link:spark-sql-streaming-StreamExecution.adoc#runBatches[runs batches] and a trigger has finished).
Expand Down
70 changes: 70 additions & 0 deletions spark-sql-streaming-StreamingQueryProgress.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
== [[StreamingQueryProgress]] StreamingQueryProgress

`StreamingQueryProgress` holds information about the progress of a streaming query.

`StreamingQueryProgress` is created exclusively when `StreamExecution` link:spark-sql-streaming-ProgressReporter.adoc#finishTrigger[finishes a trigger].

[NOTE]
====
Use link:spark-sql-streaming-StreamingQuery.adoc#lastProgress[lastProgress] property of a `StreamingQuery` to access the most recent `StreamingQueryProgress` update.
[source, scala]
----
val sq: StreamingQuery = ...
sq.lastProgress
----
====

[NOTE]
====
Use link:spark-sql-streaming-StreamingQuery.adoc#recentProgress[recentProgress] property of a `StreamingQuery` to access the most recent `StreamingQueryProgress` updates.
[source, scala]
----
val sq: StreamingQuery = ...
sq.recentProgress
----
====

[NOTE]
====
Use link:spark-sql-streaming-StreamingQueryListener.adoc#QueryProgressEvent[StreamingQueryListener] to get notified about `StreamingQueryProgress` updates while a streaming query is executed.
====

[[events]]
.StreamingQueryProgress's Properties
[cols="m,3",options="header",width="100%"]
|===
| Name
| Description

| id
| link:spark-sql-streaming-StreamingQuery.adoc#id[Unique identifier of a streaming query]

| runId
| link:spark-sql-streaming-StreamingQuery.adoc#runId[Unique identifier of the current execution of a streaming query]

| name
| link:spark-sql-streaming-StreamingQuery.adoc#name[Optional query name]

| timestamp
| Time when the trigger has started (in ISO8601 format).

| batchId
| Unique id of the current batch

| durationMs
| Durations of the internal phases (in milliseconds)

| eventTime
| Statistics of event time seen in this batch

| stateOperators
| Information about stateful operators in the query that store state.

| sources
| Statistics about the data read from every streaming source in a streaming query

| sink
| Information about progress made for a sink
|===

0 comments on commit 327161a

Please sign in to comment.