Skip to content

Commit

Permalink
[SPARK-4946] [CORE] Using AkkaUtils.askWithReply in MapOutputTracker.…
Browse files Browse the repository at this point in the history
…askTracker to reduce the chance of the communicating problem
  • Loading branch information
YanTangZhai committed Dec 24, 2014
1 parent e4c2c0a commit 9ca6541
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
*/
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging {
private val timeout = AkkaUtils.askTimeout(conf)
private val retryAttempts = AkkaUtils.numRetries(conf)
private val retryIntervalMs = AkkaUtils.retryWaitMs(conf)

/** Set to the MapOutputTrackerActor living on the driver. */
var trackerActor: ActorRef = _
Expand Down Expand Up @@ -108,8 +110,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
*/
protected def askTracker(message: Any): Any = {
try {
val future = trackerActor.ask(message)(timeout)
Await.result(future, timeout)
AkkaUtils.askWithReply(message, trackerActor, retryAttempts, retryIntervalMs, timeout)
} catch {
case e: Exception =>
logError("Error communicating with MapOutputTracker", e)
Expand Down

0 comments on commit 9ca6541

Please sign in to comment.