Skip to content

Commit

Permalink
[SPARK-27064][SS] create StreamingWrite at the beginning of streaming…
Browse files Browse the repository at this point in the history
… execution

## What changes were proposed in this pull request?

According to the [design](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing), the life cycle of `StreamingWrite` should be the same as the read side `MicroBatch/ContinuousStream`, i.e. each run of the stream query, instead of each epoch.

This PR fixes it.

## How was this patch tested?

existing tests

Closes apache#23981 from cloud-fan/dsv2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan authored and mccheah committed May 15, 2019
1 parent 38556e7 commit 3fecdd9
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import java.util.Locale
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest.time.SpanSugar._
import scala.collection.JavaConverters._

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{BinaryType, DataType}
Expand Down Expand Up @@ -227,39 +226,23 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
val topic = newTopic()
testUtils.createTopic(topic)

/* No topic field or topic option */
var writer: StreamingQuery = null
var ex: Exception = null
try {
writer = createKafkaWriter(input.toDF())(
val ex = intercept[AnalysisException] {
/* No topic field or topic option */
createKafkaWriter(input.toDF())(
withSelectExpr = "value as key", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
}
assert(ex.getMessage
.toLowerCase(Locale.ROOT)
.contains("topic option required when no 'topic' attribute is present"))

try {
val ex2 = intercept[AnalysisException] {
/* No value field */
writer = createKafkaWriter(input.toDF())(
createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "value as key"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
"required attribute 'value' not found"))
}

Expand All @@ -278,53 +261,30 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
val topic = newTopic()
testUtils.createTopic(topic)

var writer: StreamingQuery = null
var ex: Exception = null
try {
val ex = intercept[AnalysisException] {
/* topic field wrong type */
writer = createKafkaWriter(input.toDF())(
createKafkaWriter(input.toDF())(
withSelectExpr = s"CAST('1' as INT) as topic", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string"))

try {
val ex2 = intercept[AnalysisException] {
/* value field wrong type */
writer = createKafkaWriter(input.toDF())(
createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
"value attribute type must be a string or binary"))

try {
val ex3 = intercept[AnalysisException] {
/* key field wrong type */
writer = createKafkaWriter(input.toDF())(
createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
assert(ex3.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binary"))
}

Expand Down Expand Up @@ -369,35 +329,22 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
var writer: StreamingQuery = null
var ex: Exception = null
try {
writer = createKafkaWriter(

val ex = intercept[IllegalArgumentException] {
createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.key.serializer" -> "foo"))()
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'key.serializer' is not supported"))
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'key.serializer' is not supported"))

try {
writer = createKafkaWriter(
val ex2 = intercept[IllegalArgumentException] {
createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.value.serializer" -> "foo"))()
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'value.serializer' is not supported"))
} finally {
writer.stop()
}
assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'value.serializer' is not supported"))
}

test("generic - write big data with small producer buffer") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatch
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateControlMicroBatchStream}
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, WriteToMicroBatchDataSource}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
Expand Down Expand Up @@ -122,7 +122,14 @@ class MicroBatchExecution(
case r: StreamingDataSourceV2Relation => r.stream
}
uniqueSources = sources.distinct
_logicalPlan

sink match {
case s: SupportsStreamingWrite =>
val streamingWrite = createStreamingWrite(s, extraOptions, _logicalPlan)
WriteToMicroBatchDataSource(streamingWrite, _logicalPlan)

case _ => _logicalPlan
}
}

/**
Expand Down Expand Up @@ -513,9 +520,8 @@ class MicroBatchExecution(

val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
case s: SupportsStreamingWrite =>
val streamingWrite = createStreamingWrite(s, extraOptions, newAttributePlan)
WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, streamingWrite), newAttributePlan)
case _: SupportsStreamingWrite =>
newAttributePlan.asInstanceOf[WriteToMicroBatchDataSource].createPlan(currentBatchId)
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ abstract class StreamExecution(
options: Map[String, String],
inputPlan: LogicalPlan): StreamingWrite = {
val writeBuilder = table.newWriteBuilder(new DataSourceOptions(options.asJava))
.withQueryId(runId.toString)
.withQueryId(id.toString)
.withInputDataSchema(inputPlan.schema)
outputMode match {
case Append =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class ContinuousExecution(
// Throwable that caused the execution to fail
private val failure: AtomicReference[Throwable] = new AtomicReference[Throwable](null)

override val logicalPlan: LogicalPlan = {
override val logicalPlan: WriteToContinuousDataSource = {
val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]()
var nextSourceId = 0
val _logicalPlan = analyzedPlan.transform {
Expand All @@ -88,7 +88,8 @@ class ContinuousExecution(
}
uniqueSources = sources.distinct

_logicalPlan
WriteToContinuousDataSource(
createStreamingWrite(sink, extraOptions, _logicalPlan), _logicalPlan)
}

private val triggerExecutor = trigger match {
Expand Down Expand Up @@ -178,13 +179,10 @@ class ContinuousExecution(
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
}

val streamingWrite = createStreamingWrite(sink, extraOptions, withNewSources)
val planWithSink = WriteToContinuousDataSource(streamingWrite, withNewSources)

reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSessionForQuery,
planWithSink,
withNewSources,
outputMode,
checkpointFile("state"),
id,
Expand All @@ -194,7 +192,7 @@ class ContinuousExecution(
lastExecution.executedPlan // Force the lazy generation of execution plan
}

val stream = planWithSink.collect {
val stream = withNewSources.collect {
case relation: StreamingDataSourceV2Relation =>
relation.stream.asInstanceOf[ContinuousStream]
}.head
Expand All @@ -215,7 +213,13 @@ class ContinuousExecution(

// Use the parent Spark session for the endpoint since it's where this query ID is registered.
val epochEndpoint = EpochCoordinatorRef.create(
streamingWrite, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
logicalPlan.write,
stream,
this,
epochCoordinatorId,
currentBatchId,
sparkSession,
SparkEnv.get)
val epochUpdateThread = new Thread(new Runnable {
override def run: Unit = {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.sources

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite

/**
* The logical plan for writing data to a micro-batch stream.
*
* Note that this logical plan does not have a corresponding physical plan, as it will be converted
* to [[WriteToDataSourceV2]] with [[MicroBatchWrite]] before execution.
*/
case class WriteToMicroBatchDataSource(write: StreamingWrite, query: LogicalPlan)
extends LogicalPlan {
override def children: Seq[LogicalPlan] = Seq(query)
override def output: Seq[Attribute] = Nil

def createPlan(batchId: Long): WriteToDataSourceV2 = {
WriteToDataSourceV2(new MicroBatchWrite(batchId, write), query)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.sources.v2.writer.{WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -59,6 +60,19 @@ class FakeScanBuilder extends ScanBuilder with Scan {
override def toContinuousStream(checkpointLocation: String): ContinuousStream = new FakeDataStream
}

class FakeWriteBuilder extends WriteBuilder with StreamingWrite {
override def buildForStreaming(): StreamingWrite = this
override def createStreamingWriterFactory(): StreamingDataWriterFactory = {
throw new IllegalStateException("fake sink - cannot actually write")
}
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
throw new IllegalStateException("fake sink - cannot actually write")
}
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
throw new IllegalStateException("fake sink - cannot actually write")
}
}

trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
Expand All @@ -75,7 +89,7 @@ trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
throw new IllegalStateException("fake sink - cannot actually write")
new FakeWriteBuilder
}
}

Expand Down

0 comments on commit 3fecdd9

Please sign in to comment.