From fa6ed8203b18ccf2982c5a64758a6fb33b538212 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 23 Jun 2015 16:38:05 -0700 Subject: [PATCH] [SPARK-6980] Had to increase timeout on positive test case because a processor slowdown could trigger an Future TimeoutException --- .../spark/rpc/akka/AkkaRpcEnvSuite.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala index 96e6803c038d9..c0240c974ff7d 100644 --- a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala @@ -73,32 +73,34 @@ class AkkaRpcEnvSuite extends RpcEnvSuite { val echoActor = system.actorOf(Props(new EchoActor(0)), name = "echo") val sleepyActor = system.actorOf(Props(new EchoActor(50)), name = "sleepy") + val longProp = "spark.rpc.long.timeout" + val longTimeout = new RpcTimeout(1 second, longProp) val shortProp = "spark.rpc.short.timeout" - val timeout = new RpcTimeout(10 millis, shortProp) + val shortTimeout = new RpcTimeout(10 millis, shortProp) try { // Ask with immediate response - var fut = echoActor.ask("hello")(timeout.duration).mapTo[String]. - recover(timeout.addMessageIfTimeout) + var fut = echoActor.ask("hello")(longTimeout.duration).mapTo[String]. + recover(longTimeout.addMessageIfTimeout) // This should complete successfully - val result = timeout.awaitResult(fut) + val result = longTimeout.awaitResult(fut) assert(result.nonEmpty) // Ask with a delayed response and wait for response immediately that should timeout - fut = sleepyActor.ask("doh")(timeout.duration).mapTo[String] + fut = sleepyActor.ask("doh")(shortTimeout.duration).mapTo[String] val msg1 = intercept[RpcTimeoutException] { - timeout.awaitResult(fut) + shortTimeout.awaitResult(fut) }.getMessage() assert(msg1.contains(shortProp)) // Ask with delayed response using addMessageIfTimeout in recover callback - fut = sleepyActor.ask("goodbye")(timeout.duration).mapTo[String]. - recover(timeout.addMessageIfTimeout) + fut = sleepyActor.ask("goodbye")(shortTimeout.duration).mapTo[String]. + recover(shortTimeout.addMessageIfTimeout) // Allow future to complete with failure using plain Await.result, this will return // once the future is complete to verify addMessageIfTimeout was invoked @@ -113,7 +115,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite { // RpcTimeoutException, the same exception should be thrown val msg3 = intercept[RpcTimeoutException] { - timeout.awaitResult(fut) + shortTimeout.awaitResult(fut) }.getMessage() // Ensure description is not in message twice after addMessageIfTimeout and awaitResult