diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 70834636b3235..3453e2fc7d2c4 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -575,6 +575,66 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } } + test("ask a message timeout on Future using RpcTimeout") { + case class SleepyReply(msg: String) + + val rpcEndpointRef = env.setupEndpoint("ask-future", new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case msg: String => { + context.reply(msg) + } + case sr: SleepyReply => { + Thread.sleep(50) + context.reply(sr.msg) + } + } + }) + + val longTimeout = new RpcTimeout(1 second, "spark.rpc.long.timeout") + val shortTimeout = new RpcTimeout(10 millis, "spark.rpc.short.timeout") + + // Ask with immediate response, should complete successfully + val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout) + val reply1 = longTimeout.awaitResult(fut1) + assert("hello" === reply1) + + // Ask with a delayed response and wait for response immediately that should timeout + val fut2 = rpcEndpointRef.ask[String](SleepyReply("doh"), shortTimeout) + val reply2 = + intercept[RpcTimeoutException] { + shortTimeout.awaitResult(fut2) + }.getMessage + + // RpcTimeout.awaitResult should have added the property to the TimeoutException message + assert(reply2.contains(shortTimeout.timeoutProp)) + + // Ask with delayed response and allow the Future to timeout before Await.result + val fut3 = rpcEndpointRef.ask[String](SleepyReply("goodbye"),shortTimeout) + + // Allow future to complete with failure using plain Await.result, this will return + // once the future is complete to verify addMessageIfTimeout was invoked + val reply3 = + intercept[RpcTimeoutException] { + Await.result(fut3, 200 millis) + }.getMessage + + // When the future timed out, the recover callback should have used + // RpcTimeout.addMessageIfTimeout to add the property to the TimeoutException message + assert(reply3.contains(shortTimeout.timeoutProp)) + + // Use RpcTimeout.awaitResult to process Future, since it has already failed with + // RpcTimeoutException, the same RpcTimeoutException should be thrown + val reply4 = + intercept[RpcTimeoutException] { + shortTimeout.awaitResult(fut3) + }.getMessage + + // Ensure description is not in message twice after addMessageIfTimeout and awaitResult + assert(shortTimeout.timeoutProp.r.findAllIn(reply4).length === 1) + } + } class UnserializableClass diff --git a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala index b40c7eefdcdb7..54289427116ab 100644 --- a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala @@ -56,73 +56,4 @@ class AkkaRpcEnvSuite extends RpcEnvSuite { } } - test("timeout on ask Future with RpcTimeout") { - - class EchoActor(sleepDuration: Long) extends Actor { - def receive: Receive = { - case msg => - Thread.sleep(sleepDuration) - sender() ! msg - } - } - - val akkaConf = ConfigFactory.empty().withValue("akka.log-dead-letters", - ConfigValueFactory.fromAnyRef("off")) - val system = ActorSystem("EchoSystem", akkaConf) - val echoActor = system.actorOf(Props(new EchoActor(0)), name = "echo") - val sleepyActor = system.actorOf(Props(new EchoActor(50)), name = "sleepy") - - val longProp = "spark.rpc.long.timeout" - val longTimeout = new RpcTimeout(1 second, longProp) - val shortProp = "spark.rpc.short.timeout" - val shortTimeout = new RpcTimeout(10 millis, shortProp) - - try { - - // Ask with immediate response - var fut = echoActor.ask("hello")(longTimeout.duration).mapTo[String]. - recover(longTimeout.addMessageIfTimeout) - - // This should complete successfully - val result = longTimeout.awaitResult(fut) - - assert(result.nonEmpty) - - // Ask with a delayed response and wait for response immediately that should timeout - fut = sleepyActor.ask("doh")(shortTimeout.duration).mapTo[String] - val msg1 = - intercept[RpcTimeoutException] { - shortTimeout.awaitResult(fut) - }.getMessage() - - assert(msg1.contains(shortProp)) - - // Ask with delayed response using addMessageIfTimeout in recover callback - fut = sleepyActor.ask("goodbye")(shortTimeout.duration).mapTo[String]. - recover(shortTimeout.addMessageIfTimeout) - - // Allow future to complete with failure using plain Await.result, this will return - // once the future is complete to verify addMessageIfTimeout was invoked - val msg2 = - intercept[RpcTimeoutException] { - Await.result(fut, 200 millis) - }.getMessage() - - assert(msg2.contains(shortProp)) - - // Use RpcTimeout.awaitResult to process Future, since it has already failed with - // RpcTimeoutException, the same exception should be thrown - val msg3 = - intercept[RpcTimeoutException] { - shortTimeout.awaitResult(fut) - }.getMessage() - - // Ensure description is not in message twice after addMessageIfTimeout and awaitResult - assert(shortProp.r.findAllIn(msg3).length === 1) - - } finally { - system.shutdown() - } - } - }