Skip to content

Commit

Permalink
[SPARK-6980] Moved MessagePrefix to createRpcTimeoutException directly
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanCutler committed Jun 15, 2015
1 parent 1517721 commit 1394de6
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 12 deletions.
17 changes: 6 additions & 11 deletions core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,19 +197,16 @@ private[rpc] class RpcTimeoutException(message: String, cause: TimeoutException)
* Associates a timeout with a description so that a when a TimeoutException occurs, additional
* context about the timeout can be amended to the exception message.
* @param timeout timeout duration in seconds
* @param description description to be displayed in a timeout exception
* @param conf the configuration parameter that controls this timeout
*/
private[spark] class RpcTimeout(timeout: FiniteDuration, description: String) {
private[spark] class RpcTimeout(timeout: FiniteDuration, val conf: String) {

/** Get the timeout duration */
def duration: FiniteDuration = timeout

/** Get the message associated with this timeout */
def message: String = description

/** Amends the standard message of TimeoutException to include the description */
private def createRpcTimeoutException(te: TimeoutException): RpcTimeoutException = {
new RpcTimeoutException(te.getMessage() + " " + description, te)
new RpcTimeoutException(te.getMessage() + ". This timeout is controlled by " + conf, te)
}

/**
Expand Down Expand Up @@ -244,8 +241,6 @@ private[spark] class RpcTimeout(timeout: FiniteDuration, description: String) {

private[spark] object RpcTimeout {

private[this] val messagePrefix = "This timeout is controlled by "

/**
* Lookup the timeout property in the configuration and create
* a RpcTimeout with the property key in the description.
Expand All @@ -255,7 +250,7 @@ private[spark] object RpcTimeout {
*/
def apply(conf: SparkConf, timeoutProp: String): RpcTimeout = {
val timeout = { conf.getTimeAsSeconds(timeoutProp) seconds }
new RpcTimeout(timeout, messagePrefix + timeoutProp)
new RpcTimeout(timeout, timeoutProp)
}

/**
Expand All @@ -268,7 +263,7 @@ private[spark] object RpcTimeout {
*/
def apply(conf: SparkConf, timeoutProp: String, defaultValue: String): RpcTimeout = {
val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds }
new RpcTimeout(timeout, messagePrefix + timeoutProp)
new RpcTimeout(timeout, timeoutProp)
}

/**
Expand All @@ -292,6 +287,6 @@ private[spark] object RpcTimeout {
}
val finalProp = foundProp.getOrElse(timeoutPropList.head, defaultValue)
val timeout = { Utils.timeStringAsSeconds(finalProp._2) seconds }
new RpcTimeout(timeout, messagePrefix + finalProp._1)
new RpcTimeout(timeout, finalProp._1)
}
}
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val defaultDurationSeconds = 1
val rt3 = RpcTimeout(conf, Seq(defaultProp), defaultDurationSeconds.toString + "s")
assert( defaultDurationSeconds === rt3.duration.toSeconds )
assert( rt3.message.contains(defaultProp) )
assert( rt3.conf.contains(defaultProp) )

// Try to construct RpcTimeout with an unconfigured property
intercept[Throwable] {
Expand Down

0 comments on commit 1394de6

Please sign in to comment.