diff --git a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala index 6f673e518f6d4..96e6803c038d9 100644 --- a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala @@ -87,28 +87,37 @@ class AkkaRpcEnvSuite extends RpcEnvSuite { assert(result.nonEmpty) - // Ask with delayed response + // Ask with a delayed response and wait for response immediately that should timeout + fut = sleepyActor.ask("doh")(timeout.duration).mapTo[String] + val msg1 = + intercept[RpcTimeoutException] { + timeout.awaitResult(fut) + }.getMessage() + + assert(msg1.contains(shortProp)) + + // Ask with delayed response using addMessageIfTimeout in recover callback fut = sleepyActor.ask("goodbye")(timeout.duration).mapTo[String]. recover(timeout.addMessageIfTimeout) // Allow future to complete with failure using plain Await.result, this will return - // once the future is complete - val msg1 = + // once the future is complete to verify addMessageIfTimeout was invoked + val msg2 = intercept[RpcTimeoutException] { Await.result(fut, 200 millis) }.getMessage() - assert(msg1.contains(shortProp)) + assert(msg2.contains(shortProp)) // Use RpcTimeout.awaitResult to process Future, since it has already failed with // RpcTimeoutException, the same exception should be thrown - val msg2 = + val msg3 = intercept[RpcTimeoutException] { timeout.awaitResult(fut) }.getMessage() // Ensure description is not in message twice after addMessageIfTimeout and awaitResult - assert(shortProp.r.findAllIn(msg2).length === 1) + assert(shortProp.r.findAllIn(msg3).length === 1) } finally { system.shutdown()