diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index 96f7efeef98e6..04545cd53c799 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -201,6 +201,10 @@ class StreamingQueryManagerSuite extends StreamTest { // After that query is stopped, awaitAnyTerm should throw exception eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait for query to stop + // When `isActive` becomes `false`, `StreamingQueryManager` may not receive the error yet. + // Hence, call `stop` to wait until the thread of `q3` exits so that we can ensure + // `StreamingQueryManager` has already received the error. + q3.stop() testAwaitAnyTermination( ExpectException[SparkException], awaitTimeout = 100.milliseconds, @@ -217,6 +221,10 @@ class StreamingQueryManagerSuite extends StreamTest { require(!q4.isActive) val q5 = stopRandomQueryAsync(10.milliseconds, withError = true) eventually(Timeout(streamingTimeout)) { require(!q5.isActive) } + // When `isActive` becomes `false`, `StreamingQueryManager` may not receive the error yet. + // Hence, call `stop` to wait until the thread of `q5` exits so that we can ensure + // `StreamingQueryManager` has already received the error. + q5.stop() // After q5 terminates with exception, awaitAnyTerm should start throwing exception testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2.seconds) }