Skip to content

Commit

Permalink
[SPARK-6980] [CORE] Akka timeout exceptions indicate which conf contr…
Browse files Browse the repository at this point in the history
…ols them (RPC Layer)

Latest changes after refactoring to the RPC layer.  I rebased against trunk to make sure to get any recent changes since it had been a while.  I wasn't crazy about the name `ConfigureTimeout` and `RpcTimeout` seemed to fit better, but I'm open to suggestions!

I ran most of the tests and they pass, but others would get stuck with "WARN TaskSchedulerImpl: Initial job has not accepted any resources".  I think its just my machine, so I'd though I would push what I have anyway.

Still left to do:
* I only added a couple unit tests so far, there are probably some more cases to test
* Make sure all uses require a `RpcTimeout`
* Right now, both the `ask` and `Await.result` use the same timeout, should we differentiate between these in the TimeoutException message?
* I wrapped `Await.result` in `RpcTimeout`, should we also wrap `Await.ready`?
* Proper scoping of classes and methods

hardmettle, feel free to help out with any of these!

Author: Bryan Cutler <bjcutler@us.ibm.com>
Author: Harsh Gupta <harsh@Harshs-MacBook-Pro.local>
Author: BryanCutler <cutlerb@gmail.com>

Closes apache#6205 from BryanCutler/configTimeout-6980 and squashes the following commits:

46c8d48 [Bryan Cutler] [SPARK-6980] Changed RpcEnvSuite test to never reply instead of just sleeping, to avoid possible sync issues
06afa53 [Bryan Cutler] [SPARK-6980] RpcTimeout class extends Serializable, was causing error in MasterSuite
7bb70f1 [Bryan Cutler] Merge branch 'master' into configTimeout-6980
dbd5f73 [Bryan Cutler] [SPARK-6980] Changed RpcUtils askRpcTimeout and lookupRpcTimeout scope to private[spark] and improved deprecation warning msg
4e89c75 [Bryan Cutler] [SPARK-6980] Missed one usage of deprecated RpcUtils.askTimeout in YarnSchedulerBackend although it is not being used, and fixed SparkConfSuite UT to not use deprecated RpcUtils functions
6a1c50d [Bryan Cutler] [SPARK-6980] Minor cleanup of test case
7f4d78e [Bryan Cutler] [SPARK-6980] Fixed scala style checks
287059a [Bryan Cutler] [SPARK-6980] Removed extra import in AkkaRpcEnvSuite
3d8b1ff [Bryan Cutler] [SPARK-6980] Cleaned up imports in AkkaRpcEnvSuite
3a168c7 [Bryan Cutler] [SPARK-6980] Rewrote Akka RpcTimeout UTs in RpcEnvSuite
7636189 [Bryan Cutler] [SPARK-6980] Fixed call to askWithReply in DAGScheduler to use RpcTimeout - this was being compiled by auto-tupling and changing the message type of BlockManagerHeartbeat
be11c4e [Bryan Cutler] Merge branch 'master' into configTimeout-6980
039afed [Bryan Cutler] [SPARK-6980] Corrected import organization
218aa50 [Bryan Cutler] [SPARK-6980] Corrected issues from feedback
fadaf6f [Bryan Cutler] [SPARK-6980] Put back in deprecated RpcUtils askTimeout and lookupTimout to fix MiMa errors
fa6ed82 [Bryan Cutler] [SPARK-6980] Had to increase timeout on positive test case because a processor slowdown could trigger an Future TimeoutException
b05d449 [Bryan Cutler] [SPARK-6980] Changed constructor to use val duration instead of getter function, changed name of string property from conf to timeoutProp for consistency
c6cfd33 [Bryan Cutler] [SPARK-6980] Changed UT ask message timeout to explicitly intercept a SparkException
1394de6 [Bryan Cutler] [SPARK-6980] Moved MessagePrefix to createRpcTimeoutException directly
1517721 [Bryan Cutler] [SPARK-6980] RpcTimeout object scope should be private[spark]
2206b4d [Bryan Cutler] [SPARK-6980] Added unit test for ask then immediat awaitReply
1b9beab [Bryan Cutler] [SPARK-6980] Cleaned up import ordering
08f5afc [Bryan Cutler] [SPARK-6980] Added UT for constructing RpcTimeout with default value
d3754d1 [Bryan Cutler] [SPARK-6980] Added akkaConf to prevent dead letter logging
995d196 [Bryan Cutler] [SPARK-6980] Cleaned up import ordering, comments, spacing from PR feedback
7774d56 [Bryan Cutler] [SPARK-6980] Cleaned up UT imports
4351c48 [Bryan Cutler] [SPARK-6980] Added UT for addMessageIfTimeout, cleaned up UTs
1607a5f [Bryan Cutler] [SPARK-6980] Changed addMessageIfTimeout to PartialFunction, cleanup from PR comments
2f94095 [Bryan Cutler] [SPARK-6980] Added addMessageIfTimeout for when a Future is completed with TimeoutException
235919b [Bryan Cutler] [SPARK-6980] Resolved conflicts after master merge
c07d05c [Bryan Cutler] Merge branch 'master' into configTimeout-6980-tmp
b7fb99f [BryanCutler] Merge pull request #2 from hardmettle/configTimeoutUpdates_6980
4be3a8d [Harsh Gupta] Modifying loop condition to find property match
0ee5642 [Harsh Gupta] Changing the loop condition to halt at the first match in the property list for RpcEnv exception catch
f74064d [Harsh Gupta] Retrieving properties from property list using iterator and while loop instead of chained functions
a294569 [Bryan Cutler] [SPARK-6980] Added creation of RpcTimeout with Seq of property keys
23d2f26 [Bryan Cutler] [SPARK-6980] Fixed await result not being handled by RpcTimeout
49f9f04 [Bryan Cutler] [SPARK-6980] Minor cleanup and scala style fix
5b59a44 [Bryan Cutler] [SPARK-6980] Added some RpcTimeout unit tests
78a2c0a [Bryan Cutler] [SPARK-6980] Using RpcTimeout.awaitResult for future in AppClient now
97523e0 [Bryan Cutler] [SPARK-6980] Akka ask timeout description refactored to RPC layer
  • Loading branch information
BryanCutler authored and squito committed Jul 3, 2015
1 parent d983819 commit aa7bbc1
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class WorkerWebUI(
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
with Logging {

private[ui] val timeout = RpcUtils.askTimeout(worker.conf)
private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf)

initialize()

Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.rpc

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Future
import scala.reflect.ClassTag

import org.apache.spark.util.RpcUtils
Expand All @@ -32,7 +31,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)

private[this] val maxRetries = RpcUtils.numRetries(conf)
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

/**
* return the address for the [[RpcEndpointRef]]
Expand All @@ -52,7 +51,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
*
* This method only sends the message once and never retries.
*/
def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T]
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
Expand Down Expand Up @@ -91,15 +90,15 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askWithRetry[T: ClassTag](message: Any, timeout: FiniteDuration): T = {
def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
// TODO: Consider removing multiple attempts
var attempts = 0
var lastException: Exception = null
while (attempts < maxRetries) {
attempts += 1
try {
val future = ask[T](message, timeout)
val result = Await.result(future, timeout)
val result = timeout.awaitResult(future)
if (result == null) {
throw new SparkException("Actor returned null")
}
Expand All @@ -110,10 +109,14 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
lastException = e
logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
}
Thread.sleep(retryWaitMs)

if (attempts < maxRetries) {
Thread.sleep(retryWaitMs)
}
}

throw new SparkException(
s"Error sending message [message = $message]", lastException)
}

}
112 changes: 109 additions & 3 deletions core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.spark.rpc

import java.net.URI
import java.util.concurrent.TimeoutException

import scala.concurrent.{Await, Future}
import scala.concurrent.{Awaitable, Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.spark.{SecurityManager, SparkConf}
Expand Down Expand Up @@ -66,7 +68,7 @@ private[spark] object RpcEnv {
*/
private[spark] abstract class RpcEnv(conf: SparkConf) {

private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf)
private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)

/**
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
Expand Down Expand Up @@ -94,7 +96,7 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
* Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.
*/
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
Await.result(asyncSetupEndpointRefByURI(uri), defaultLookupTimeout)
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
}

/**
Expand Down Expand Up @@ -184,3 +186,107 @@ private[spark] object RpcAddress {
RpcAddress(host, port)
}
}


/**
* An exception thrown if RpcTimeout modifies a [[TimeoutException]].
*/
private[rpc] class RpcTimeoutException(message: String, cause: TimeoutException)
extends TimeoutException(message) { initCause(cause) }


/**
* Associates a timeout with a description so that a when a TimeoutException occurs, additional
* context about the timeout can be amended to the exception message.
* @param duration timeout duration in seconds
* @param timeoutProp the configuration property that controls this timeout
*/
private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: String)
extends Serializable {

/** Amends the standard message of TimeoutException to include the description */
private def createRpcTimeoutException(te: TimeoutException): RpcTimeoutException = {
new RpcTimeoutException(te.getMessage() + ". This timeout is controlled by " + timeoutProp, te)
}

/**
* PartialFunction to match a TimeoutException and add the timeout description to the message
*
* @note This can be used in the recover callback of a Future to add to a TimeoutException
* Example:
* val timeout = new RpcTimeout(5 millis, "short timeout")
* Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
*/
def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
// The exception has already been converted to a RpcTimeoutException so just raise it
case rte: RpcTimeoutException => throw rte
// Any other TimeoutException get converted to a RpcTimeoutException with modified message
case te: TimeoutException => throw createRpcTimeoutException(te)
}

/**
* Wait for the completed result and return it. If the result is not available within this
* timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
* @param awaitable the `Awaitable` to be awaited
* @throws RpcTimeoutException if after waiting for the specified time `awaitable`
* is still not ready
*/
def awaitResult[T](awaitable: Awaitable[T]): T = {
try {
Await.result(awaitable, duration)
} catch addMessageIfTimeout
}
}


private[spark] object RpcTimeout {

/**
* Lookup the timeout property in the configuration and create
* a RpcTimeout with the property key in the description.
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
* @throws NoSuchElementException if property is not set
*/
def apply(conf: SparkConf, timeoutProp: String): RpcTimeout = {
val timeout = { conf.getTimeAsSeconds(timeoutProp) seconds }
new RpcTimeout(timeout, timeoutProp)
}

/**
* Lookup the timeout property in the configuration and create
* a RpcTimeout with the property key in the description.
* Uses the given default value if property is not set
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
* @param defaultValue default timeout value in seconds if property not found
*/
def apply(conf: SparkConf, timeoutProp: String, defaultValue: String): RpcTimeout = {
val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds }
new RpcTimeout(timeout, timeoutProp)
}

/**
* Lookup prioritized list of timeout properties in the configuration
* and create a RpcTimeout with the first set property key in the
* description.
* Uses the given default value if property is not set
* @param conf configuration properties containing the timeout
* @param timeoutPropList prioritized list of property keys for the timeout in seconds
* @param defaultValue default timeout value in seconds if no properties found
*/
def apply(conf: SparkConf, timeoutPropList: Seq[String], defaultValue: String): RpcTimeout = {
require(timeoutPropList.nonEmpty)

// Find the first set property or use the default value with the first property
val itr = timeoutPropList.iterator
var foundProp: Option[(String, String)] = None
while (itr.hasNext && foundProp.isEmpty){
val propKey = itr.next()
conf.getOption(propKey).foreach { prop => foundProp = Some(propKey, prop) }
}
val finalProp = foundProp.getOrElse(timeoutPropList.head, defaultValue)
val timeout = { Utils.timeStringAsSeconds(finalProp._2) seconds }
new RpcTimeout(timeout, finalProp._1)
}
}
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.rpc.akka
import java.util.concurrent.ConcurrentHashMap

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand Down Expand Up @@ -214,8 +213,11 @@ private[spark] class AkkaRpcEnv private[akka] (

override def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
import actorSystem.dispatcher
actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout).
map(new AkkaRpcEndpointRef(defaultAddress, _, conf))
actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout.duration).
map(new AkkaRpcEndpointRef(defaultAddress, _, conf)).
// this is just in case there is a timeout from creating the future in resolveOne, we want the
// exception to indicate the conf that determines the timeout
recover(defaultLookupTimeout.addMessageIfTimeout)
}

override def uriOf(systemName: String, address: RpcAddress, endpointName: String): String = {
Expand Down Expand Up @@ -295,8 +297,8 @@ private[akka] class AkkaRpcEndpointRef(
actorRef ! AkkaMessage(message, false)
}

override def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = {
actorRef.ask(AkkaMessage(message, true))(timeout).flatMap {
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
actorRef.ask(AkkaMessage(message, true))(timeout.duration).flatMap {
// The function will run in the calling thread, so it should be short and never block.
case msg @ AkkaMessage(message, reply) =>
if (reply) {
Expand All @@ -307,7 +309,8 @@ private[akka] class AkkaRpcEndpointRef(
}
case AkkaFailure(e) =>
Future.failed(e)
}(ThreadUtils.sameThread).mapTo[T]
}(ThreadUtils.sameThread).mapTo[T].
recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
}

override def toString: String = s"${getClass.getSimpleName}($actorRef)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.storage._
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util._
Expand Down Expand Up @@ -188,7 +189,7 @@ class DAGScheduler(
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
blockManagerMaster.driverEndpoint.askWithRetry[Boolean](
BlockManagerHeartbeat(blockManagerId), 600 seconds)
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}

// Called by TaskScheduler when an executor fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[spark] abstract class YarnSchedulerBackend(
private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))

private implicit val askTimeout = RpcUtils.askTimeout(sc.conf)
private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)

/**
* Request executors from the ApplicationMaster by specifying the total number desired.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class BlockManagerMaster(
isDriver: Boolean)
extends Logging {

val timeout = RpcUtils.askTimeout(conf)
val timeout = RpcUtils.askRpcTimeout(conf)

/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
def removeExecutor(execId: String) {
Expand Down Expand Up @@ -106,7 +106,7 @@ class BlockManagerMaster(
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}", e)
}(ThreadUtils.sameThread)
if (blocking) {
Await.result(future, timeout)
timeout.awaitResult(future)
}
}

Expand All @@ -118,7 +118,7 @@ class BlockManagerMaster(
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}", e)
}(ThreadUtils.sameThread)
if (blocking) {
Await.result(future, timeout)
timeout.awaitResult(future)
}
}

Expand All @@ -132,7 +132,7 @@ class BlockManagerMaster(
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}", e)
}(ThreadUtils.sameThread)
if (blocking) {
Await.result(future, timeout)
timeout.awaitResult(future)
}
}

Expand Down Expand Up @@ -176,8 +176,8 @@ class BlockManagerMaster(
CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
Option[BlockStatus],
Iterable[Option[BlockStatus]]]]
val blockStatus = Await.result(
Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread), timeout)
val blockStatus = timeout.awaitResult(
Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread))
if (blockStatus == null) {
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
}
Expand All @@ -199,7 +199,7 @@ class BlockManagerMaster(
askSlaves: Boolean): Seq[BlockId] = {
val msg = GetMatchingBlockIds(filter, askSlaves)
val future = driverEndpoint.askWithRetry[Future[Seq[BlockId]]](msg)
Await.result(future, timeout)
timeout.awaitResult(future)
}

/**
Expand Down
Loading

0 comments on commit aa7bbc1

Please sign in to comment.