Skip to content

Latest commit

 

History

History
194 lines (132 loc) · 7.22 KB

spark-sql-streaming-StreamingQueryManager.adoc

File metadata and controls

194 lines (132 loc) · 7.22 KB

StreamingQueryManager — Streaming Query Management

StreamingQueryManager is the management interface for continuous queries (per SparkSession).

StreamingQueryManager manages all continuous structured queries per SparkSession that is available using SparkSession.streams operator.

val spark: SparkSession = ...
val queries = spark.streams

StreamingQueryManager is created when…​FIXME

Table 1. StreamingQueryManager’s Internal Registries and Counters (in alphabetical order)
Name Description

stateStoreCoordinator

listenerBus

activeQueries

Registry of StreamingQueryWrapper per id

Used in active, get, startQuery and notifyQueryTermination.

notifyQueryTermination Internal Method

Caution
FIXME

Creating StreamingQueryWrapper (Serializable StreamingQuery) — createQuery Internal Method

createQuery(
  userSpecifiedName: Option[String],
  userSpecifiedCheckpointLocation: Option[String],
  df: DataFrame,
  sink: Sink,
  outputMode: OutputMode,
  useTempCheckpointLocation: Boolean,
  recoverFromCheckpointLocation: Boolean,
  trigger: Trigger,
  triggerClock: Clock): StreamingQueryWrapper
Caution
FIXME
Note

recoverFromCheckpointLocation flag corresponds to recoverFromCheckpointLocation flag that StreamingQueryManager uses to start a streaming query and which is enabled by default (and is in fact the only place where createQuery is used).

  • memory sink has the flag enabled for Complete output mode only

  • foreach sink has the flag always enabled

  • console sink has the flag always disabled

  • all other sinks have the flag always enabled

Note
userSpecifiedName corresponds to queryName option (that can be defined using DataStreamWriter's queryName method) while userSpecifiedCheckpointLocation is checkpointLocation option.
Note
createQuery is used exclusively when StreamingQueryManager starts executing a streaming query.

Initialization

StreamingQueryManager manages the following instances:

Starting Execution of Streaming Query — startQuery Internal Method

startQuery(
  userSpecifiedName: Option[String],
  userSpecifiedCheckpointLocation: Option[String],
  df: DataFrame,
  sink: Sink,
  outputMode: OutputMode,
  useTempCheckpointLocation: Boolean = false,
  recoverFromCheckpointLocation: Boolean = true,
  trigger: Trigger = ProcessingTime(0),
  triggerClock: Clock = new SystemClock()): StreamingQuery

startQuery starts a streaming query.

Note
trigger defaults to 0 milliseconds (as ProcessingTime(0)).

Internally, startQuery first creates a streaming query, registers it in activeQueries internal registry and starts the query.

In the end, startQuery returns the query (as part of the fluent API so you can chain operators) or reports the exception that was reported when starting the query.

startQuery reports a IllegalArgumentException when there is another query registered under name. startQuery looks it up in activeQueries internal registry.

Cannot start query with name [name] as a query with that name is already active

startQuery reports a IllegalStateException when a query is started again from checkpoint. startQuery looks it up in activeQueries internal registry.

Cannot start query with id [id] as another query with same id is already active.
Perhaps you are attempting to restart a query from checkpoint that is already active.
Note
startQuery is used exclusively when DataStreamWriter is started.

Return All Active Continuous Queries per SQLContext

active: Array[StreamingQuery]

active method returns a collection of StreamingQuery instances for the current SQLContext.

Getting Active Continuous Query By Name

get(name: String): StreamingQuery

get method returns a StreamingQuery by name.

It may throw an IllegalArgumentException when no StreamingQuery exists for the name.

java.lang.IllegalArgumentException: There is no active query with name hello
  at org.apache.spark.sql.StreamingQueryManager$$anonfun$get$1.apply(StreamingQueryManager.scala:59)
  at org.apache.spark.sql.StreamingQueryManager$$anonfun$get$1.apply(StreamingQueryManager.scala:59)
  at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
  at scala.collection.AbstractMap.getOrElse(Map.scala:59)
  at org.apache.spark.sql.StreamingQueryManager.get(StreamingQueryManager.scala:58)
  ... 49 elided

StreamingQueryListener Management - Adding or Removing Listeners

  • addListener(listener: StreamingQueryListener): Unit adds listener to the internal listenerBus.

  • removeListener(listener: StreamingQueryListener): Unit removes listener from the internal listenerBus.

postListenerEvent

postListenerEvent(event: StreamingQueryListener.Event): Unit

postListenerEvent posts a StreamingQueryListener.Event to listenerBus.

StreamingQueryListener

Caution
FIXME

StreamingQueryListener is an interface for listening to query life cycle events, i.e. a query start, progress and termination events.

lastTerminatedQuery - internal barrier

Caution
FIXME Why is lastTerminatedQuery needed?

Used in:

  • awaitAnyTermination

  • awaitAnyTermination(timeoutMs: Long)

They all wait 10 millis before doing the check of lastTerminatedQuery being non-null.

It is set in:

  • resetTerminated() resets lastTerminatedQuery, i.e. sets it to null.

  • notifyQueryTermination(terminatedQuery: StreamingQuery) sets lastTerminatedQuery to be terminatedQuery and notifies all the threads that wait on awaitTerminationLock.

    It is called from StreamExecution.runBatches.

Creating StreamingQueryManager Instance

StreamingQueryManager takes the following when created:

  • SparkSession

StreamingQueryManager initializes the internal registries and counters.