Skip to content

Commit

Permalink
[SPARK-27111][SS] Fix a race that a continuous query may fail with In…
Browse files Browse the repository at this point in the history
…terruptedException

Before a Kafka consumer gets assigned with partitions, its offset will contain 0 partitions. However, runContinuous will still run and launch a Spark job having 0 partitions. In this case, there is a race that epoch may interrupt the query execution thread after `lastExecution.toRdd`, and either `epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` or the next `runContinuous` will get interrupted unintentionally.

To handle this case, this PR has the following changes:

- Clean up the resources in `queryExecutionThread.runUninterruptibly`. This may increase the waiting time of `stop` but should be minor because the operations here are very fast (just sending an RPC message in the same process and stopping a very simple thread).
- Clear the interrupted status at the end so that it won't impact the `runContinuous` call. We may clear the interrupted status set by `stop`, but it doesn't affect the query termination because `runActivatedStream` will check `state` and exit accordingly.

I also updated the clean up codes to make sure exceptions thrown from `epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` won't stop the clean up.

Jenkins

Closes apache#24034 from zsxwing/SPARK-27111.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
(cherry picked from commit 6e1c082)
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
  • Loading branch information
zsxwing authored and kai-chi committed Aug 1, 2019
1 parent d7a7d72 commit 37de7cf
Showing 1 changed file with 24 additions and 8 deletions.
Expand Up @@ -272,14 +272,30 @@ class ContinuousExecution(
logInfo(s"Query $id ignoring exception from reconfiguring: $t")
// interrupted by reconfiguration - swallow exception so we can restart the query
} finally {
epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
SparkEnv.get.rpcEnv.stop(epochEndpoint)

epochUpdateThread.interrupt()
epochUpdateThread.join()

stopSources()
sparkSession.sparkContext.cancelJobGroup(runId.toString)
// The above execution may finish before getting interrupted, for example, a Spark job having
// 0 partitions will complete immediately. Then the interrupted status will sneak here.
//
// To handle this case, we do the two things here:
//
// 1. Clean up the resources in `queryExecutionThread.runUninterruptibly`. This may increase
// the waiting time of `stop` but should be minor because the operations here are very fast
// (just sending an RPC message in the same process and stopping a very simple thread).
// 2. Clear the interrupted status at the end so that it won't impact the `runContinuous`
// call. We may clear the interrupted status set by `stop`, but it doesn't affect the query
// termination because `runActivatedStream` will check `state` and exit accordingly.
queryExecutionThread.runUninterruptibly {
try {
epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
} finally {
SparkEnv.get.rpcEnv.stop(epochEndpoint)
epochUpdateThread.interrupt()
epochUpdateThread.join()
stopSources()
// The following line must be the last line because it may fail if SparkContext is stopped
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
}
Thread.interrupted()
}
}

Expand Down

0 comments on commit 37de7cf

Please sign in to comment.