Skip to content

Commit

Permalink
[SPARK-6980] Added some RpcTimeout unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanCutler committed May 16, 2015
1 parent 78a2c0a commit 5b59a44
Showing 1 changed file with 24 additions and 1 deletion.
25 changes: 24 additions & 1 deletion core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,23 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
})

val conf = new SparkConf()
val shortProp = "spark.rpc.short.timeout"
conf.set("spark.rpc.retry.wait", "0")
conf.set("spark.rpc.numRetries", "1")
val anotherEnv = createRpcEnv(conf, "remote", 13345)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")
try {
val e = intercept[Exception] {
rpcEndpointRef.askWithRetry[String]("hello", 1 millis)
rpcEndpointRef.askWithRetry[String]("hello", new RpcTimeout(1 millis, shortProp))
}
assert(e.isInstanceOf[TimeoutException] || e.getCause.isInstanceOf[TimeoutException])
e match {
case te: TimeoutException =>
assert(te.getMessage().contains(shortProp))
case e: Exception =>
assert(e.getCause().getMessage().contains(shortProp))
}
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
Expand Down Expand Up @@ -539,6 +546,22 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
}
}

test("construction of RpcTimeout using properties") {
val conf = new SparkConf

val testProp = "spark.ask.test.timeout"
val testDurationSeconds = 30

conf.set(testProp, testDurationSeconds.toString + "s")

val rt = RpcTimeout(conf, testProp)
assert( testDurationSeconds === rt.duration.toSeconds )

val ex = intercept[Throwable] {
RpcTimeout(conf, "spark.ask.invalid.timeout")
}
}

}

class UnserializableClass
Expand Down

0 comments on commit 5b59a44

Please sign in to comment.