Skip to content

Commit

Permalink
Use getTimeAsMs and getTimeAsSeconds and other minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 21, 2015
1 parent 31dbe69 commit e0d80a9
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
// Translate old value to a duration, with 10s wait time per try.
translation = s => s"${s.toLong * 10}s")),
"spark.rpc.num.retries" -> Seq(
"spark.rpc.numRetries" -> Seq(
AlternateConfig("spark.akka.num.retries", "1.4")),
"spark.rpc.retry.wait" -> Seq(
AlternateConfig("spark.akka.retry.wait", "1.4")),
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/util/RpcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ object RpcUtils {

/** Returns the configured number of times to retry connecting */
def numRetries(conf: SparkConf): Int = {
conf.getInt("spark.rpc.num.retries", 3)
conf.getInt("spark.rpc.numRetries", 3)
}

/** Returns the configured number of milliseconds to wait on each retry */
def retryWaitMs(conf: SparkConf): Long = {
conf.getLong("spark.rpc.retry.wait", 3000)
conf.getTimeAsMs("spark.rpc.retry.wait", "3s")
}

/** Returns the default Spark timeout to use for Rpc ask operations. */
/** Returns the default Spark timeout to use for RPC ask operations. */
def askTimeout(conf: SparkConf): FiniteDuration = {
conf.getLong("spark.rpc.askTimeout", 30) seconds
conf.getTimeAsSeconds("spark.rpc.askTimeout", "30s") seconds
}

/** Returns the default Spark timeout to use for Rpc remote endpoint lookup. */
/** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
def lookupTimeout(conf: SparkConf): FiniteDuration = {
conf.getLong("spark.rpc.lookupTimeout", 30) seconds
conf.getTimeAsSeconds("spark.rpc.lookupTimeout", "30s") seconds
}
}
12 changes: 7 additions & 5 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.spark

import java.util.concurrent.{TimeUnit, Executors}

import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Try, Random}

import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import org.apache.spark.util.ResetSystemProperties
import org.apache.spark.util.{RpcUtils, ResetSystemProperties}
import com.esotericsoftware.kryo.Kryo

class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
Expand Down Expand Up @@ -231,16 +233,16 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
assert(!conf.contains("spark.rpc.lookupTimeout"))

conf.set("spark.akka.num.retries", "1")
assert(conf.get("spark.rpc.num.retries") === "1")
assert(RpcUtils.numRetries(conf) === 1)

conf.set("spark.akka.retry.wait", "2")
assert(conf.get("spark.rpc.retry.wait") === "2")
assert(RpcUtils.retryWaitMs(conf) === 2L)

conf.set("spark.akka.askTimeout", "3")
assert(conf.get("spark.rpc.askTimeout") === "3")
assert(RpcUtils.askTimeout(conf) === (3 seconds))

conf.set("spark.akka.lookupTimeout", "4")
assert(conf.get("spark.rpc.lookupTimeout") === "4")
assert(RpcUtils.lookupTimeout(conf) === (4 seconds))
}
}

Expand Down

0 comments on commit e0d80a9

Please sign in to comment.