Permalink
Find file Copy path
128 lines (110 sloc) 4.05 KB

Trigger — How Frequently to Check Sources For New Data

Trigger defines how frequently a streaming query should be executed and therefore emit a new data (which StreamExecution uses to resolve a TriggerExecutor).

Note
A trigger can also be called a batch interval (as in the older Spark Streaming).
Table 1. Triggers
Trigger Creating Instance

ContinuousTrigger

ProcessingTime

Trigger.ProcessingTime(long intervalMs)

Trigger.ProcessingTime(Duration interval)

Trigger.ProcessingTime(String interval)

OneTimeTrigger

Note
You specify the trigger for a streaming query using DataStreamWriter's trigger method.
import org.apache.spark.sql.streaming.Trigger
val query = spark.
  readStream.
  format("rate").
  load.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.Once). // <-- execute once and stop
  queryName("rate-once").
  start

scala> query.isActive
res0: Boolean = false

scala> println(query.lastProgress)
{
  "id" : "2ae4b0a4-434f-4ca7-a523-4e859c07175b",
  "runId" : "24039ce5-906c-4f90-b6e7-bbb3ec38a1f5",
  "name" : "rate-once",
  "timestamp" : "2017-07-04T18:39:35.998Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 1365,
    "getBatch" : 29,
    "getOffset" : 0,
    "queryPlanning" : 285,
    "triggerExecution" : 1742,
    "walCommit" : 40
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
    "startOffset" : null,
    "endOffset" : 0,
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@7dbf277"
  }
}
Note
Although Trigger allows for custom implementations, StreamExecution refuses such attempts and reports an IllegalStateException.
case object MyTrigger extends Trigger
scala> val query = spark.
  readStream.
  format("rate").
  load.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(MyTrigger). // <-- use custom trigger
  queryName("rate-once").
  start
java.lang.IllegalStateException: Unknown type of trigger: MyTrigger
  at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:178)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284)
  ... 57 elided

ProcessingTime

ProcessingTime is a Trigger that assumes that milliseconds is the minimum time unit.

You can create an instance of ProcessingTime using the following constructors:

  • ProcessingTime(Long) that accepts non-negative values that represent milliseconds.

    ProcessingTime(10)
  • ProcessingTime(interval: String) or ProcessingTime.create(interval: String) that accept CalendarInterval instances with or without leading interval string.

    ProcessingTime("10 milliseconds")
    ProcessingTime("interval 10 milliseconds")
  • ProcessingTime(Duration) that accepts scala.concurrent.duration.Duration instances.

    ProcessingTime(10.seconds)
  • ProcessingTime.create(interval: Long, unit: TimeUnit) for Long and java.util.concurrent.TimeUnit instances.

    ProcessingTime.create(10, TimeUnit.SECONDS)