diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 47e69a374074a..c1996e08756a6 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -432,7 +432,7 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3", // Translate old value to a duration, with 10s wait time per try. translation = s => s"${s.toLong * 10}s")), - "spark.rpc.num.retries" -> Seq( + "spark.rpc.numRetries" -> Seq( AlternateConfig("spark.akka.num.retries", "1.4")), "spark.rpc.retry.wait" -> Seq( AlternateConfig("spark.akka.retry.wait", "1.4")), diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index 9c94a7d11391e..5ae793e0e87a3 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -38,21 +38,21 @@ object RpcUtils { /** Returns the configured number of times to retry connecting */ def numRetries(conf: SparkConf): Int = { - conf.getInt("spark.rpc.num.retries", 3) + conf.getInt("spark.rpc.numRetries", 3) } /** Returns the configured number of milliseconds to wait on each retry */ def retryWaitMs(conf: SparkConf): Long = { - conf.getLong("spark.rpc.retry.wait", 3000) + conf.getTimeAsMs("spark.rpc.retry.wait", "3s") } - /** Returns the default Spark timeout to use for Rpc ask operations. */ + /** Returns the default Spark timeout to use for RPC ask operations. */ def askTimeout(conf: SparkConf): FiniteDuration = { - conf.getLong("spark.rpc.askTimeout", 30) seconds + conf.getTimeAsSeconds("spark.rpc.askTimeout", "30s") seconds } - /** Returns the default Spark timeout to use for Rpc remote endpoint lookup. */ + /** Returns the default Spark timeout to use for RPC remote endpoint lookup. */ def lookupTimeout(conf: SparkConf): FiniteDuration = { - conf.getLong("spark.rpc.lookupTimeout", 30) seconds + conf.getTimeAsSeconds("spark.rpc.lookupTimeout", "30s") seconds } } diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 157d47d0fddd4..d7d8014a20498 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -19,11 +19,13 @@ package org.apache.spark import java.util.concurrent.{TimeUnit, Executors} +import scala.concurrent.duration._ +import scala.language.postfixOps import scala.util.{Try, Random} import org.scalatest.FunSuite import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer} -import org.apache.spark.util.ResetSystemProperties +import org.apache.spark.util.{RpcUtils, ResetSystemProperties} import com.esotericsoftware.kryo.Kryo class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties { @@ -231,16 +233,16 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro assert(!conf.contains("spark.rpc.lookupTimeout")) conf.set("spark.akka.num.retries", "1") - assert(conf.get("spark.rpc.num.retries") === "1") + assert(RpcUtils.numRetries(conf) === 1) conf.set("spark.akka.retry.wait", "2") - assert(conf.get("spark.rpc.retry.wait") === "2") + assert(RpcUtils.retryWaitMs(conf) === 2L) conf.set("spark.akka.askTimeout", "3") - assert(conf.get("spark.rpc.askTimeout") === "3") + assert(RpcUtils.askTimeout(conf) === (3 seconds)) conf.set("spark.akka.lookupTimeout", "4") - assert(conf.get("spark.rpc.lookupTimeout") === "4") + assert(RpcUtils.lookupTimeout(conf) === (4 seconds)) } }