Skip to content

Commit

Permalink
[SPARK-6980] Had to increase timeout on positive test case because a …
Browse files Browse the repository at this point in the history
…processor slowdown could trigger an Future TimeoutException
  • Loading branch information
BryanCutler committed Jun 23, 2015
1 parent b05d449 commit fa6ed82
Showing 1 changed file with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,32 +73,34 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
val echoActor = system.actorOf(Props(new EchoActor(0)), name = "echo")
val sleepyActor = system.actorOf(Props(new EchoActor(50)), name = "sleepy")

val longProp = "spark.rpc.long.timeout"
val longTimeout = new RpcTimeout(1 second, longProp)
val shortProp = "spark.rpc.short.timeout"
val timeout = new RpcTimeout(10 millis, shortProp)
val shortTimeout = new RpcTimeout(10 millis, shortProp)

try {

// Ask with immediate response
var fut = echoActor.ask("hello")(timeout.duration).mapTo[String].
recover(timeout.addMessageIfTimeout)
var fut = echoActor.ask("hello")(longTimeout.duration).mapTo[String].
recover(longTimeout.addMessageIfTimeout)

// This should complete successfully
val result = timeout.awaitResult(fut)
val result = longTimeout.awaitResult(fut)

assert(result.nonEmpty)

// Ask with a delayed response and wait for response immediately that should timeout
fut = sleepyActor.ask("doh")(timeout.duration).mapTo[String]
fut = sleepyActor.ask("doh")(shortTimeout.duration).mapTo[String]
val msg1 =
intercept[RpcTimeoutException] {
timeout.awaitResult(fut)
shortTimeout.awaitResult(fut)
}.getMessage()

assert(msg1.contains(shortProp))

// Ask with delayed response using addMessageIfTimeout in recover callback
fut = sleepyActor.ask("goodbye")(timeout.duration).mapTo[String].
recover(timeout.addMessageIfTimeout)
fut = sleepyActor.ask("goodbye")(shortTimeout.duration).mapTo[String].
recover(shortTimeout.addMessageIfTimeout)

// Allow future to complete with failure using plain Await.result, this will return
// once the future is complete to verify addMessageIfTimeout was invoked
Expand All @@ -113,7 +115,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
// RpcTimeoutException, the same exception should be thrown
val msg3 =
intercept[RpcTimeoutException] {
timeout.awaitResult(fut)
shortTimeout.awaitResult(fut)
}.getMessage()

// Ensure description is not in message twice after addMessageIfTimeout and awaitResult
Expand Down

0 comments on commit fa6ed82

Please sign in to comment.