diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index e3a649d755450..47e69a374074a 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -431,7 +431,15 @@ private[spark] object SparkConf extends Logging { "spark.yarn.am.waitTime" -> Seq( AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3", // Translate old value to a duration, with 10s wait time per try. - translation = s => s"${s.toLong * 10}s")) + translation = s => s"${s.toLong * 10}s")), + "spark.rpc.num.retries" -> Seq( + AlternateConfig("spark.akka.num.retries", "1.4")), + "spark.rpc.retry.wait" -> Seq( + AlternateConfig("spark.akka.retry.wait", "1.4")), + "spark.rpc.askTimeout" -> Seq( + AlternateConfig("spark.akka.askTimeout", "1.4")), + "spark.rpc.lookupTimeout" -> Seq( + AlternateConfig("spark.akka.lookupTimeout", "1.4")) ) /** diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 8d13b2a2cd4f3..c2c3e9a9e4827 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -27,7 +27,7 @@ import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} -import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils} +import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils} /** * Proxy that relays messages to the driver. @@ -36,7 +36,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with ActorLogReceive with Logging { var masterActor: ActorSelection = _ - val timeout = AkkaUtils.askTimeout(conf) + val timeout = RpcUtils.askTimeout(conf) override def preStart(): Unit = { masterActor = context.actorSelection( @@ -155,7 +155,7 @@ object Client { if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { conf.set("spark.akka.logLifecycleEvents", "true") } - conf.set("spark.akka.askTimeout", "10") + conf.set("spark.rpc.askTimeout", "10") conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING")) Logger.getRootLogger.setLevel(driverArgs.logLevel) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 4f06d7f96c46e..43c8a934c311a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master -import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils} +import org.apache.spark.util.{ActorLogReceive, RpcUtils, Utils, AkkaUtils} /** * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL, @@ -193,7 +193,7 @@ private[spark] class AppClient( def stop() { if (actor != null) { try { - val timeout = AkkaUtils.askTimeout(conf) + val timeout = RpcUtils.askTimeout(conf) val future = actor.ask(StopAppClient)(timeout) Await.result(future, timeout) } catch { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c5a6b1beac9be..ff2eed6dee70a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -47,7 +47,7 @@ import org.apache.spark.deploy.rest.StandaloneRestServer import org.apache.spark.metrics.MetricsSystem import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils} +import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, SignalLogger, Utils} private[master] class Master( host: String, @@ -931,7 +931,7 @@ private[deploy] object Master extends Logging { securityManager = securityMgr) val actor = actorSystem.actorOf( Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName) - val timeout = AkkaUtils.askTimeout(conf) + val timeout = RpcUtils.askTimeout(conf) val portsRequest = actor.ask(BoundPortsRequest)(timeout) val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse] (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index bb11e0642ddc6..aad9c87bdb987 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -21,7 +21,7 @@ import org.apache.spark.Logging import org.apache.spark.deploy.master.Master import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.AkkaUtils +import org.apache.spark.util.RpcUtils /** * Web UI server for the standalone master. @@ -31,7 +31,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging { val masterActorRef = master.self - val timeout = AkkaUtils.askTimeout(master.conf) + val timeout = RpcUtils.askTimeout(master.conf) val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) initialize() diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 4f19af59f409f..2d6b8d4204795 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -32,7 +32,7 @@ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion} -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils} import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription} import org.apache.spark.deploy.ClientArguments._ @@ -223,7 +223,7 @@ private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf) } protected def handleKill(submissionId: String): KillSubmissionResponse = { - val askTimeout = AkkaUtils.askTimeout(conf) + val askTimeout = RpcUtils.askTimeout(conf) val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse]( DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout) val k = new KillSubmissionResponse @@ -257,7 +257,7 @@ private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf) } protected def handleStatus(submissionId: String): SubmissionStatusResponse = { - val askTimeout = AkkaUtils.askTimeout(conf) + val askTimeout = RpcUtils.askTimeout(conf) val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse]( DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout) val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) } @@ -321,7 +321,7 @@ private[rest] class SubmitRequestServlet( responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { requestMessage match { case submitRequest: CreateSubmissionRequest => - val askTimeout = AkkaUtils.askTimeout(conf) + val askTimeout = RpcUtils.askTimeout(conf) val driverDescription = buildDriverDescription(submitRequest) val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse]( DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index de6423beb543e..b3bb5f911dbd7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -25,7 +25,7 @@ import org.apache.spark.deploy.worker.Worker import org.apache.spark.deploy.worker.ui.WorkerWebUI._ import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.AkkaUtils +import org.apache.spark.util.RpcUtils /** * Web UI server for the standalone worker. @@ -38,7 +38,7 @@ class WorkerWebUI( extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI") with Logging { - private[ui] val timeout = AkkaUtils.askTimeout(worker.conf) + private[ui] val timeout = RpcUtils.askTimeout(worker.conf) initialize() diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index cba038ca355d7..a5336b7563802 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -25,7 +25,7 @@ import scala.language.postfixOps import scala.reflect.ClassTag import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{RpcUtils, Utils} /** * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to @@ -38,7 +38,7 @@ import org.apache.spark.util.{AkkaUtils, Utils} */ private[spark] abstract class RpcEnv(conf: SparkConf) { - private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf) /** * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement @@ -282,9 +282,9 @@ trait ThreadSafeRpcEndpoint extends RpcEndpoint private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf) extends Serializable with Logging { - private[this] val maxRetries = conf.getInt("spark.akka.num.retries", 3) - private[this] val retryWaitMs = conf.getLong("spark.akka.retry.wait", 3000) - private[this] val defaultAskTimeout = conf.getLong("spark.akka.askTimeout", 30) seconds + private[this] val maxRetries = RpcUtils.numRetries(conf) + private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf) + private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf) /** * return the address for the [[RpcEndpointRef]] diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index f72566c370a6f..1406a36a669c5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -24,7 +24,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.ui.JettyUtils -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{RpcUtils, Utils} import scala.util.control.NonFatal @@ -46,7 +46,7 @@ private[spark] abstract class YarnSchedulerBackend( private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint( YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv)) - private implicit val askTimeout = AkkaUtils.askTimeout(sc.conf) + private implicit val askTimeout = RpcUtils.askTimeout(sc.conf) /** * Request executors from the ApplicationMaster by specifying the total number desired. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index ceacf043029f3..c798843bd5d8a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -23,7 +23,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.storage.BlockManagerMessages._ -import org.apache.spark.util.AkkaUtils +import org.apache.spark.util.RpcUtils private[spark] class BlockManagerMaster( @@ -32,7 +32,7 @@ class BlockManagerMaster( isDriver: Boolean) extends Logging { - val timeout = AkkaUtils.askTimeout(conf) + val timeout = RpcUtils.askTimeout(conf) /** Remove a dead executor from the driver endpoint. This is only called on the driver side. */ def removeExecutor(execId: String) { diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 8e8cc7cc6389e..b725df3b44596 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.util import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.Await -import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.concurrent.duration.FiniteDuration import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} import akka.pattern.ask @@ -125,16 +125,6 @@ private[spark] object AkkaUtils extends Logging { (actorSystem, boundPort) } - /** Returns the default Spark timeout to use for Akka ask operations. */ - def askTimeout(conf: SparkConf): FiniteDuration = { - Duration.create(conf.getLong("spark.akka.askTimeout", 30), "seconds") - } - - /** Returns the default Spark timeout to use for Akka remote actor lookup. */ - def lookupTimeout(conf: SparkConf): FiniteDuration = { - Duration.create(conf.getLong("spark.akka.lookupTimeout", 30), "seconds") - } - private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024 /** Returns the configured max frame size for Akka messages in bytes. */ @@ -150,16 +140,6 @@ private[spark] object AkkaUtils extends Logging { /** Space reserved for extra data in an Akka message besides serialized task or task result. */ val reservedSizeBytes = 200 * 1024 - /** Returns the configured number of times to retry connecting */ - def numRetries(conf: SparkConf): Int = { - conf.getInt("spark.akka.num.retries", 3) - } - - /** Returns the configured number of milliseconds to wait on each retry */ - def retryWaitMs(conf: SparkConf): Int = { - conf.getInt("spark.akka.retry.wait", 3000) - } - /** * Send a message to the given actor and get its result within a default timeout, or * throw a SparkException if this fails. @@ -216,7 +196,7 @@ private[spark] object AkkaUtils extends Logging { val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name) - val timeout = AkkaUtils.lookupTimeout(conf) + val timeout = RpcUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) } @@ -230,7 +210,7 @@ private[spark] object AkkaUtils extends Logging { val executorActorSystemName = SparkEnv.executorActorSystemName Utils.checkHost(host, "Expected hostname") val url = address(protocol(actorSystem), executorActorSystemName, host, port, name) - val timeout = AkkaUtils.lookupTimeout(conf) + val timeout = RpcUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) } diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index 6665b17c3d5df..9c94a7d11391e 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -17,6 +17,9 @@ package org.apache.spark.util +import scala.concurrent.duration._ +import scala.language.postfixOps + import org.apache.spark.{SparkEnv, SparkConf} import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} @@ -32,4 +35,24 @@ object RpcUtils { Utils.checkHost(driverHost, "Expected hostname") rpcEnv.setupEndpointRef(driverActorSystemName, RpcAddress(driverHost, driverPort), name) } + + /** Returns the configured number of times to retry connecting */ + def numRetries(conf: SparkConf): Int = { + conf.getInt("spark.rpc.num.retries", 3) + } + + /** Returns the configured number of milliseconds to wait on each retry */ + def retryWaitMs(conf: SparkConf): Long = { + conf.getLong("spark.rpc.retry.wait", 3000) + } + + /** Returns the default Spark timeout to use for Rpc ask operations. */ + def askTimeout(conf: SparkConf): FiniteDuration = { + conf.getLong("spark.rpc.askTimeout", 30) seconds + } + + /** Returns the default Spark timeout to use for Rpc remote endpoint lookup. */ + def lookupTimeout(conf: SparkConf): FiniteDuration = { + conf.getLong("spark.rpc.lookupTimeout", 30) seconds + } } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 6295d34be5ca9..6ed057a7cab97 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -154,7 +154,7 @@ class MapOutputTrackerSuite extends FunSuite { test("remote fetch below akka frame size") { val newConf = new SparkConf newConf.set("spark.akka.frameSize", "1") - newConf.set("spark.akka.askTimeout", "1") // Fail fast + newConf.set("spark.rpc.askTimeout", "1") // Fail fast val masterTracker = new MapOutputTrackerMaster(conf) val rpcEnv = createRpcEnv("spark") @@ -180,7 +180,7 @@ class MapOutputTrackerSuite extends FunSuite { test("remote fetch exceeds akka frame size") { val newConf = new SparkConf newConf.set("spark.akka.frameSize", "1") - newConf.set("spark.akka.askTimeout", "1") // Fail fast + newConf.set("spark.rpc.askTimeout", "1") // Fail fast val masterTracker = new MapOutputTrackerMaster(conf) val rpcEnv = createRpcEnv("test") diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 8e6c200c4ba00..157d47d0fddd4 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -222,6 +222,26 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420) } + test("akka deprecated configs") { + val conf = new SparkConf() + + assert(!conf.contains("spark.rpc.num.retries")) + assert(!conf.contains("spark.rpc.retry.wait")) + assert(!conf.contains("spark.rpc.askTimeout")) + assert(!conf.contains("spark.rpc.lookupTimeout")) + + conf.set("spark.akka.num.retries", "1") + assert(conf.get("spark.rpc.num.retries") === "1") + + conf.set("spark.akka.retry.wait", "2") + assert(conf.get("spark.rpc.retry.wait") === "2") + + conf.set("spark.akka.askTimeout", "3") + assert(conf.get("spark.rpc.askTimeout") === "3") + + conf.set("spark.akka.lookupTimeout", "4") + assert(conf.get("spark.rpc.lookupTimeout") === "4") + } } class Class1 {} diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index ada07ef11cd7a..5fbda37c7cb88 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -155,8 +155,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { }) val conf = new SparkConf() - conf.set("spark.akka.retry.wait", "0") - conf.set("spark.akka.num.retries", "1") + conf.set("spark.rpc.retry.wait", "0") + conf.set("spark.rpc.num.retries", "1") val anotherEnv = createRpcEnv(conf, "remote", 13345) // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")