Skip to content

Commit

Permalink
[SPARK-6980] Rewrote Akka RpcTimeout UTs in RpcEnvSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanCutler committed Jun 26, 2015
1 parent 7636189 commit 3a168c7
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 69 deletions.
60 changes: 60 additions & 0 deletions core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,66 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
}

test("ask a message timeout on Future using RpcTimeout") {
case class SleepyReply(msg: String)

val rpcEndpointRef = env.setupEndpoint("ask-future", new RpcEndpoint {
override val rpcEnv = env

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case msg: String => {
context.reply(msg)
}
case sr: SleepyReply => {
Thread.sleep(50)
context.reply(sr.msg)
}
}
})

val longTimeout = new RpcTimeout(1 second, "spark.rpc.long.timeout")
val shortTimeout = new RpcTimeout(10 millis, "spark.rpc.short.timeout")

// Ask with immediate response, should complete successfully
val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout)
val reply1 = longTimeout.awaitResult(fut1)
assert("hello" === reply1)

// Ask with a delayed response and wait for response immediately that should timeout
val fut2 = rpcEndpointRef.ask[String](SleepyReply("doh"), shortTimeout)
val reply2 =
intercept[RpcTimeoutException] {
shortTimeout.awaitResult(fut2)
}.getMessage

// RpcTimeout.awaitResult should have added the property to the TimeoutException message
assert(reply2.contains(shortTimeout.timeoutProp))

// Ask with delayed response and allow the Future to timeout before Await.result
val fut3 = rpcEndpointRef.ask[String](SleepyReply("goodbye"),shortTimeout)

// Allow future to complete with failure using plain Await.result, this will return
// once the future is complete to verify addMessageIfTimeout was invoked
val reply3 =
intercept[RpcTimeoutException] {
Await.result(fut3, 200 millis)
}.getMessage

// When the future timed out, the recover callback should have used
// RpcTimeout.addMessageIfTimeout to add the property to the TimeoutException message
assert(reply3.contains(shortTimeout.timeoutProp))

// Use RpcTimeout.awaitResult to process Future, since it has already failed with
// RpcTimeoutException, the same RpcTimeoutException should be thrown
val reply4 =
intercept[RpcTimeoutException] {
shortTimeout.awaitResult(fut3)
}.getMessage

// Ensure description is not in message twice after addMessageIfTimeout and awaitResult
assert(shortTimeout.timeoutProp.r.findAllIn(reply4).length === 1)
}

}

class UnserializableClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,73 +56,4 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
}
}

test("timeout on ask Future with RpcTimeout") {

class EchoActor(sleepDuration: Long) extends Actor {
def receive: Receive = {
case msg =>
Thread.sleep(sleepDuration)
sender() ! msg
}
}

val akkaConf = ConfigFactory.empty().withValue("akka.log-dead-letters",
ConfigValueFactory.fromAnyRef("off"))
val system = ActorSystem("EchoSystem", akkaConf)
val echoActor = system.actorOf(Props(new EchoActor(0)), name = "echo")
val sleepyActor = system.actorOf(Props(new EchoActor(50)), name = "sleepy")

val longProp = "spark.rpc.long.timeout"
val longTimeout = new RpcTimeout(1 second, longProp)
val shortProp = "spark.rpc.short.timeout"
val shortTimeout = new RpcTimeout(10 millis, shortProp)

try {

// Ask with immediate response
var fut = echoActor.ask("hello")(longTimeout.duration).mapTo[String].
recover(longTimeout.addMessageIfTimeout)

// This should complete successfully
val result = longTimeout.awaitResult(fut)

assert(result.nonEmpty)

// Ask with a delayed response and wait for response immediately that should timeout
fut = sleepyActor.ask("doh")(shortTimeout.duration).mapTo[String]
val msg1 =
intercept[RpcTimeoutException] {
shortTimeout.awaitResult(fut)
}.getMessage()

assert(msg1.contains(shortProp))

// Ask with delayed response using addMessageIfTimeout in recover callback
fut = sleepyActor.ask("goodbye")(shortTimeout.duration).mapTo[String].
recover(shortTimeout.addMessageIfTimeout)

// Allow future to complete with failure using plain Await.result, this will return
// once the future is complete to verify addMessageIfTimeout was invoked
val msg2 =
intercept[RpcTimeoutException] {
Await.result(fut, 200 millis)
}.getMessage()

assert(msg2.contains(shortProp))

// Use RpcTimeout.awaitResult to process Future, since it has already failed with
// RpcTimeoutException, the same exception should be thrown
val msg3 =
intercept[RpcTimeoutException] {
shortTimeout.awaitResult(fut)
}.getMessage()

// Ensure description is not in message twice after addMessageIfTimeout and awaitResult
assert(shortProp.r.findAllIn(msg3).length === 1)

} finally {
system.shutdown()
}
}

}

0 comments on commit 3a168c7

Please sign in to comment.