Skip to content

Commit

Permalink
[SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robu…
Browse files Browse the repository at this point in the history
…stly in DataStreamReaderWriterSuite

### What changes were proposed in this pull request?

This PR aims to add `sinkParameter`  to check sink options robustly and independently in DataStreamReaderWriterSuite

### Why are the changes needed?

`LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`.

To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness.

```scala
val query = df.writeStream.
   ...
   .start()
assert(LastOptions.paramters(..))
query.stop()
```

### Does this PR introduce _any_ user-facing change?

No. This is a test-only change.

### How was this patch tested?

Pass the newly updated test case.

Closes apache#29730 from dongjoon-hyun/SPARK-32845.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
dongjoon-hyun committed Sep 11, 2020
1 parent f6322d1 commit b4be6a6
Showing 1 changed file with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ object LastOptions {
var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
var parameters: Map[String, String] = null
var sinkParameters: Map[String, String] = null
var schema: Option[StructType] = null
var partitionColumns: Seq[String] = Nil

def clear(): Unit = {
parameters = null
sinkParameters = null
schema = null
partitionColumns = null
reset(mockStreamSourceProvider)
Expand Down Expand Up @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
LastOptions.parameters = parameters
LastOptions.sinkParameters = parameters
LastOptions.partitionColumns = partitionColumns
LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode)
(_: Long, _: DataFrame) => {}
Expand Down Expand Up @@ -170,20 +172,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {

LastOptions.clear()

val query = df.writeStream
df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("opt1", "5")
.options(Map("opt2" -> "4"))
.options(map)
.option("checkpointLocation", newMetadataDir)
.start()
.stop()

assert(LastOptions.parameters("opt1") == "5")
assert(LastOptions.parameters("opt2") == "4")
assert(LastOptions.parameters("opt3") == "3")
assert(LastOptions.parameters.contains("checkpointLocation"))

query.stop()
assert(LastOptions.sinkParameters("opt1") == "5")
assert(LastOptions.sinkParameters("opt2") == "4")
assert(LastOptions.sinkParameters("opt3") == "3")
assert(LastOptions.sinkParameters.contains("checkpointLocation"))
}

test("SPARK-32832: later option should override earlier options for load()") {
Expand All @@ -204,7 +205,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
.load()
assert(LastOptions.parameters.isEmpty)

val query = ds.writeStream
ds.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.option("paTh", "1")
Expand All @@ -213,8 +214,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
.option("patH", "4")
.option("path", "5")
.start()
assert(LastOptions.parameters("path") == "5")
query.stop()
.stop()
assert(LastOptions.sinkParameters("path") == "5")
}

test("partitioning") {
Expand Down Expand Up @@ -787,13 +788,13 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
withTempDir { checkpointPath =>
withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true",
SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) {
val query = df.writeStream
df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("path", "tmp4")
.start("tmp5")
.stop()
// The legacy behavior overwrites the path option.
assert(LastOptions.parameters("path") == "tmp5")
query.stop()
assert(LastOptions.sinkParameters("path") == "tmp5")
}
}
}
Expand Down

0 comments on commit b4be6a6

Please sign in to comment.