Skip to content

Commit

Permalink
[SPARK-38080][TESTS][SS] Flaky test: StreamingQueryManagerSuite: 'awa…
Browse files Browse the repository at this point in the history
…itAnyTermination with timeout and resetTerminated'

### What changes were proposed in this pull request?

Fix a flaky test.

### Why are the changes needed?

`StreamingQueryManagerSuite: 'awaitAnyTermination with timeout and resetTerminated'` is a flaky test.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- The flaky test can be reproduced by adding a `Thread.sleep(100)` in https://github.com/apache/spark/blob/v3.2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L346
- Using the above reproduction to verify the PR.

Closes apache#35372 from zsxwing/SPARK-38080.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 811b7e3)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 78a9168)
  • Loading branch information
zsxwing authored and huaxingao committed Feb 14, 2022
1 parent 3f2a77d commit c38691a
Showing 1 changed file with 8 additions and 0 deletions.
Expand Up @@ -201,6 +201,10 @@ class StreamingQueryManagerSuite extends StreamTest {

// After that query is stopped, awaitAnyTerm should throw exception
eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait for query to stop
// When `isActive` becomes `false`, `StreamingQueryManager` may not receive the error yet.
// Hence, call `stop` to wait until the thread of `q3` exits so that we can ensure
// `StreamingQueryManager` has already received the error.
q3.stop()
testAwaitAnyTermination(
ExpectException[SparkException],
awaitTimeout = 100.milliseconds,
Expand All @@ -217,6 +221,10 @@ class StreamingQueryManagerSuite extends StreamTest {
require(!q4.isActive)
val q5 = stopRandomQueryAsync(10.milliseconds, withError = true)
eventually(Timeout(streamingTimeout)) { require(!q5.isActive) }
// When `isActive` becomes `false`, `StreamingQueryManager` may not receive the error yet.
// Hence, call `stop` to wait until the thread of `q5` exits so that we can ensure
// `StreamingQueryManager` has already received the error.
q5.stop()
// After q5 terminates with exception, awaitAnyTerm should start throwing exception
testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2.seconds)
}
Expand Down

0 comments on commit c38691a

Please sign in to comment.