Skip to content

Latest commit

 

History

History
127 lines (100 loc) · 3.5 KB

spark-sql-streaming-Dataset-operators.adoc

File metadata and controls

127 lines (100 loc) · 3.5 KB

Streaming Operators / Streaming Dataset API

Dataset API has a set of operators that are of particular use in Structured Streaming.

Table 1. Streaming Operators
Operator Description

dropDuplicates

Drops duplicate records (given a subset of columns)

dropDuplicates(): Dataset[T]
dropDuplicates(colNames: Seq[String]): Dataset[T]
dropDuplicates(col1: String, cols: String*): Dataset[T]

explain

Explains execution plans

explain(): Unit
explain(extended: Boolean): Unit

groupBy

Aggregates rows by a untyped grouping function

groupBy(cols: Column*): RelationalGroupedDataset
groupBy(col1: String, cols: String*): RelationalGroupedDataset

groupByKey

Aggregates records by a typed grouping function

groupByKey(func: T => K): KeyValueGroupedDataset[K, T]

withWatermark

Defines a streaming watermark on a event time column

scala> spark.version
res0: String = 2.3.0-SNAPSHOT

// input stream
val rates = spark.
  readStream.
  format("rate").
  option("rowsPerSecond", 1).
  load

// stream processing
rates.groupByKey

// output stream
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val sq = rates.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Complete).
  queryName("rate-console").
  start

// eventually...
sq.stop

Streaming Aggregation (using Grouping Function) — groupByKey Operator

groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]

groupByKey creates a KeyValueGroupedDataset with the keys unique, and the associated values are actually collections of one or more values associated with the key.

Note
The type of the input argument of func is the type of rows in the Dataset (i.e. Dataset[T]).
scala> spark.version
res0: String = 2.3.0-SNAPSHOT

// input stream
import java.sql.Timestamp
val signals = spark.
  readStream.
  format("rate").
  option("rowsPerSecond", 1).
  load.
  withColumn("value", $"value" % 10)  // <-- randomize the values (just for fun)
  withColumn("deviceId", lit(util.Random.nextInt(10))). // <-- 10 devices randomly assigned to values
  as[(Timestamp, Long, Int)] // <-- convert to a "better" type (from "unpleasant" Row)

// stream processing using groupByKey operator
// groupByKey(func: ((Timestamp, Long, Int)) => K): KeyValueGroupedDataset[K, (Timestamp, Long, Int)]
// K becomes Int which is a device id
val deviceId: ((Timestamp, Long, Int)) => Int = { case (_, _, deviceId) => deviceId }
scala> val signalsByDevice = signals.groupByKey(deviceId)
signalsByDevice: org.apache.spark.sql.KeyValueGroupedDataset[Int,(java.sql.Timestamp, Long, Int)] = org.apache.spark.sql.KeyValueGroupedDataset@19d40bc6

Internally,…​FIXME

Specifying Event Time Watermark — withWatermark Operator

withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
Caution
FIXME