diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 7649258b010b8..ee511ec5370cf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -27,7 +27,7 @@ import org.apache.spark.rpc.{RpcEndpointRef, RpcAddress, RpcEnv, ThreadSafeRpcEn import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} -import org.apache.spark.util.{SparkExitCode, Utils} +import org.apache.spark.util.{ThreadUtils, SparkExitCode, Utils} /** * Proxy that relays messages to the driver. @@ -41,7 +41,7 @@ private class ClientEndpoint( // A scheduled executor used to send messages at the specified time. private val forwardMessageThread = - Utils.newDaemonSingleThreadScheduledExecutor("client-forward-message") + ThreadUtils.newDaemonSingleThreadScheduledExecutor("client-forward-message") // Used to provide the implicit parameter of `Future` methods. private val forwardMessageExecutionContext = ExecutionContext.fromExecutor(forwardMessageThread, diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index c1a7fb4975c39..0247d16f38b43 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -26,7 +26,7 @@ import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master import org.apache.spark.rpc._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL, @@ -70,11 +70,11 @@ private[spark] class AppClient( masterRpcAddresses.size, // Make sure we can register with all masters at the same time 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable](), - Utils.namedThreadFactory("appclient-register-master-threadpool")) + ThreadUtils.namedThreadFactory("appclient-register-master-threadpool")) // A scheduled executor for scheduling the registration actions private val registrationRetryThread = - Utils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread") + ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread") override def onStart(): Unit = { try { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index a8213cd410c51..6fc850be37046 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -45,7 +45,7 @@ import org.apache.spark.deploy.rest.StandaloneRestServer import org.apache.spark.metrics.MetricsSystem import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{SignalLogger, Utils} +import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} private[master] class Master( override val rpcEnv: RpcEnv, @@ -56,7 +56,7 @@ private[master] class Master( extends ThreadSafeRpcEndpoint with Logging with LeaderElectable { private val forwardMessageThread = - Utils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") + ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") // TODO Remove it once we don't use akka.serialization.Serialization private val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f6967cc2583f5..d9b91021496ab 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -36,7 +36,7 @@ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ -import org.apache.spark.util.{SignalLogger, Utils} +import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} private[worker] class Worker( override val rpcEnv: RpcEnv, @@ -59,9 +59,9 @@ private[worker] class Worker( // A scheduled executor used to send messages at the specified time. private val forwordMessageScheduler = - Utils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler") + ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler") // A separated thread to clean up the workDir - private val cleanupThread = Utils.newDaemonSingleThreadExecutor("worker-cleanup-thread") + private val cleanupThread = ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread") // Used to provide the implicit parameter of `Future` methods. private val cleanupThreadExecutor = ExecutionContext.fromExecutor(cleanupThread) @@ -144,7 +144,7 @@ private[worker] class Worker( masterRpcAddresses.size, // Make sure we can register with all masters at the same time 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable](), - Utils.namedThreadFactory("worker-register-master-threadpool")) + ThreadUtils.namedThreadFactory("worker-register-master-threadpool")) var coresUsed = 0 var memoryUsed = 0 diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 154ca25c60904..2feb7341b159b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -896,24 +896,6 @@ private[spark] object Utils extends Logging { hostPortParseResults.get(hostPort) } - /** - * Wrapper over newSingleThreadExecutor. Thread names are formatted as prefix-ID, where ID is a - * unique, sequentially assigned integer. - */ - def newDaemonSingleThreadExecutor(prefix: String): ExecutorService = { - val threadFactory = namedThreadFactory(prefix) - Executors.newSingleThreadExecutor(threadFactory) - } - - /** - * Wrapper over newSingleThreadScheduledExecutor. Thread names are formatted as prefix-ID, where - * ID is a unique, sequentially assigned integer. - */ - def newDaemonSingleThreadScheduledExecutor(prefix: String): ScheduledExecutorService = { - val threadFactory = namedThreadFactory(prefix) - Executors.newSingleThreadScheduledExecutor(threadFactory) - } - /** * Return the string to tell how long has passed in milliseconds. */