Skip to content

Commit

Permalink
ContinuousExecution -- StreamExecution of StreamWriteSupport Sinks wi…
Browse files Browse the repository at this point in the history
…th ContinuousTrigger
  • Loading branch information
jaceklaskowski committed Nov 11, 2018
1 parent 836196d commit fce5037
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 37 deletions.
19 changes: 18 additions & 1 deletion spark-sql-streaming-ConsoleSinkProvider.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
== [[ConsoleSinkProvider]] ConsoleSinkProvider

`ConsoleSinkProvider` is a link:spark-sql-streaming-StreamSinkProvider.adoc[StreamSinkProvider] for link:spark-sql-streaming-ConsoleSink.adoc[ConsoleSink].
`ConsoleSinkProvider` is a `DataSourceV2` with <<spark-sql-streaming-StreamWriteSupport.adoc#, StreamWriteSupport>> for <<spark-sql-streaming-ConsoleSink.adoc#< ConsoleSink>>.

`ConsoleSinkProvider` is a link:spark-sql-DataSourceRegister.adoc[DataSourceRegister] that registers the `ConsoleSink` streaming sink as `console` format.

[source, scala]
----
import org.apache.spark.sql.streaming.Trigger
val q = spark
.readStream
.format("rate")
.load
.writeStream
.format("console") // <-- requests ConsoleSinkProvider for a sink
.trigger(Trigger.Once)
.start
scala> println(q.lastProgress.sink)
{
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@edde9bb"
}
----
43 changes: 32 additions & 11 deletions spark-sql-streaming-ContinuousExecution.adoc
Original file line number Diff line number Diff line change
@@ -1,8 +1,32 @@
== [[ContinuousExecution]] ContinuousExecution
== [[ContinuousExecution]] ContinuousExecution -- StreamExecution of StreamWriteSupport Sinks with ContinuousTrigger

`ContinuousExecution` is...FIXME
`ContinuousExecution` is a <<spark-sql-streaming-StreamExecution.adoc#, StreamExecution>> of a <<sink, StreamWriteSupport sink>> with a <<trigger, ContinuousTrigger>>.

`ContinuousExecution` is <<creating-instance, created>> exclusively when `DataStreamWriter` is requested to <<spark-sql-streaming-DataStreamWriter.adoc#start, start>> for a <<spark-sql-streaming-StreamWriteSupport.adoc#, StreamWriteSupport>> sink and a <<spark-sql-streaming-Trigger.adoc#ContinuousTrigger, ContinuousTrigger>> (when `StreamingQueryManager` is requested to <<spark-sql-streaming-StreamingQueryManager.adoc#createQuery, create a serializable StreamingQuery>>).
[source, scala]
----
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val q = spark
.readStream
.format("rate")
.load
.writeStream
.format("console")
.trigger(Trigger.Continuous(10.seconds))
.queryName("rate2console")
.option("truncate", false)
.start
scala> q.explain
== Physical Plan ==
WriteToContinuousDataSource ConsoleWriter[numRows=20, truncate=false]
+- *(1) Project [timestamp#758, value#759L]
+- *(1) ScanV2 rate[timestamp#758, value#759L]
// q.stop
----

`ContinuousExecution` is <<creating-instance, created>> when `StreamingQueryManager` is requested to <<spark-sql-streaming-StreamingQueryManager.adoc#createQuery, create a streaming query>> with a <<spark-sql-streaming-StreamWriteSupport.adoc#, StreamWriteSupport sink>> and a <<spark-sql-streaming-Trigger.adoc#ContinuousTrigger, ContinuousTrigger>> (when `DataStreamWriter` is requested to <<spark-sql-streaming-DataStreamWriter.adoc#start, start an execution of the streaming query>>).

[[internal-registries]]
.ContinuousExecution's Internal Properties (e.g. Registries, Counters and Flags)
Expand All @@ -21,9 +45,6 @@ Used when...FIXME

Used when...FIXME

| <<logicalPlan, logicalPlan>>
|

| `triggerExecutor`
a| [[triggerExecutor]] <<spark-sql-streaming-TriggerExecutor.adoc#, TriggerExecutor>> for the <<trigger, Trigger>>:

Expand Down Expand Up @@ -114,7 +135,7 @@ NOTE: `runActivatedStream` is part of <<spark-sql-streaming-StreamExecution.adoc

`runActivatedStream`...FIXME

=== [[runContinuous]] `runContinuous` Internal Method
=== [[runContinuous]] Running Streaming Query -- `runContinuous` Internal Method

[source, scala]
----
Expand All @@ -123,21 +144,21 @@ runContinuous(sparkSessionForQuery: SparkSession): Unit

`runContinuous`...FIXME

NOTE: `runContinuous` is used exclusively when `ContinuousExecution` is requested to <<runActivatedStream, runActivatedStream>>.
NOTE: `runContinuous` is used exclusively when `ContinuousExecution` is requested to <<runActivatedStream, run the activated streaming query>>.

=== [[creating-instance]] Creating ContinuousExecution Instance

`ContinuousExecution` takes the following when created:

* [[sparkSession]] `SparkSession`
* [[name]] Query name
* [[name]] The name of the structured query
* [[checkpointRoot]] Path to the checkpoint directory (aka _metadata directory_)
* [[analyzedPlan]] Analyzed logical query plan (i.e. `LogicalPlan`)
* [[analyzedPlan]] Analyzed logical query plan (`LogicalPlan`)
* [[sink]] <<spark-sql-streaming-StreamWriteSupport.adoc#, StreamWriteSupport>>
* [[trigger]] <<spark-sql-streaming-Trigger.adoc#, Trigger>>
* [[triggerClock]] `Clock`
* [[outputMode]] <<spark-sql-streaming-OutputMode.adoc#, Output mode>>
* [[extraOptions]] Map[String, String]
* [[extraOptions]] Options (`Map[String, String]`)
* [[deleteCheckpointOnStop]] `deleteCheckpointOnStop` flag to control whether to delete the checkpoint directory on stop

`ContinuousExecution` initializes the <<internal-registries, internal registries and counters>>.
Expand Down
38 changes: 16 additions & 22 deletions spark-sql-streaming-StreamExecution.adoc
Original file line number Diff line number Diff line change
@@ -1,22 +1,10 @@
== [[StreamExecution]] StreamExecution -- Base of Streaming Query Executions

`StreamExecution` is the <<contract, base>> of <<extensions, streaming query executions>> that can execute <<logicalPlan, structured query>> continuously and concurrently (as a <<queryExecutionThread, stream execution thread>>).
`StreamExecution` is the <<contract, base>> of <<extensions, streaming query executions>> that can <<runActivatedStream, execute>> the <<logicalPlan, structured query>> continuously on a <<queryExecutionThread, stream execution thread>>.

NOTE: *Continuous query*, *streaming query*, *continuous Dataset*, *streaming Dataset* are synonyms, and `StreamExecution` uses <<logicalPlan, analyzed logical plan>> internally to refer to it.

[[contract]]
[source, scala]
----
package org.apache.spark.sql.execution.streaming
abstract class StreamExecution(...) extends ... {
// only required properties (vals and methods) that have no implementation
// the others follow
def logicalPlan: LogicalPlan
def runActivatedStream(sparkSessionForStream: SparkSession): Unit
}
----

.(Subset of) StreamExecution Contract
[cols="1m,2",options="header",width="100%"]
|===
Expand All @@ -26,14 +14,24 @@ abstract class StreamExecution(...) extends ... {
| logicalPlan
a| [[logicalPlan]] <<spark-sql-streaming-ProgressReporter.adoc#logicalPlan, LogicalPlan>>

[source, scala]
----
logicalPlan: LogicalPlan
----

NOTE: `logicalPlan` is part of <<spark-sql-streaming-ProgressReporter.adoc#logicalPlan, ProgressReporter Contract>> to...FIXME.

Used when `StreamExecution` is requested to <<runStream, runStream>> and <<toDebugString, toDebugString>>

| runActivatedStream
| [[runActivatedStream]] Running the activated streaming query
a| [[runActivatedStream]] Running the activated streaming query

[source, scala]
----
runActivatedStream(sparkSessionForStream: SparkSession): Unit
----

Used exclusively when `StreamExecution` is requested to <<runStream, runStream>> (when in `INITIALIZING` state)
Used exclusively when `StreamExecution` is requested to <<runStream, run the streaming query>> (when transitioning from the `INITIALIZING` state to `ACTIVE`)
|===

[[extensions]]
Expand Down Expand Up @@ -523,7 +521,7 @@ CAUTION: FIXME Describe `catch` block for exception handling
[[runStream-finally]]
CAUTION: FIXME Describe `finally` block for query termination

NOTE: `runStream` is used exclusively when `StreamExecution` is requested to start the <<queryExecutionThread, stream execution thread for a streaming query>>.
NOTE: `runStream` is used exclusively when the <<queryExecutionThread, stream execution thread>> is requested to start.

==== [[runBatches-batch-runner]] TriggerExecutor's Batch Runner

Expand Down Expand Up @@ -726,20 +724,16 @@ In the end, `processAllAvailable` releases <<awaitProgressLock, awaitProgressLoc
queryExecutionThread: QueryExecutionThread
----

`queryExecutionThread` is a Java thread of execution (https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html[java.util.Thread]) that <<runStream, runs a structured query>>.
`queryExecutionThread` is a Java thread of execution (https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html[java.util.Thread]) that <<runStream, runs the structured query>> when started.

`queryExecutionThread` uses the name *stream execution thread for [id]* (that uses <<prettyIdString, prettyIdString>> for the id).
`queryExecutionThread` uses the name *stream execution thread for [id]* (that uses <<prettyIdString, prettyIdString>> for the id, i.e. `queryName [id = [id], runId = [runId]]`).

`queryExecutionThread` is a <<spark-sql-streaming-QueryExecutionThread.adoc#, QueryExecutionThread>> (that is a Spark `UninterruptibleThread` with `runUninterruptibly` method for running a block of code without being interrupted by `Thread.interrupt()`).

`queryExecutionThread` is started (as a daemon thread) when `StreamExecution` is requested to <<start, start>>.

When started, `queryExecutionThread` sets the thread-local properties as the <<callSite, call site>> and <<runBatches, runs the streaming query>>.

When `StreamExecution` finishes <<runStream-finally, running the streaming query>>, it uses `queryExecutionThread` to execute the `runUninterruptibly` code block uninterruptibly.

`queryExecutionThread` is also used when `StreamExecution` is requested to <<stop, stop>>

[TIP]
====
Use Java's http://docs.oracle.com/javase/8/docs/technotes/guides/management/jconsole.html[jconsole] or https://docs.oracle.com/javase/8/docs/technotes/tools/unix/jstack.html[jstack] to monitor the streaming threads.
Expand Down
6 changes: 3 additions & 3 deletions spark-sql-streaming-Trigger.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ NOTE: A trigger can also be called a *batch interval* (as in the older Spark Str
[[available-implementations]]
[[available-triggers]]
.Triggers
[cols="1,2",options="header",width="100%"]
[cols="1m,2",options="header",width="100%"]
|===
| Trigger
| Creating Instance

| `ContinuousTrigger`
| ContinuousTrigger
| [[ContinuousTrigger]]

.3+.^| <<ProcessingTime, ProcessingTime>>
| `Trigger.ProcessingTime(long intervalMs)`
| `Trigger.ProcessingTime(Duration interval)`
| `Trigger.ProcessingTime(String interval)`

| `OneTimeTrigger`
| OneTimeTrigger
| [[OneTimeTrigger]]
|===

Expand Down

0 comments on commit fce5037

Please sign in to comment.