Skip to content

Commit

Permalink
Use ThreadUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 23, 2015
1 parent 060ff31 commit 25a84d8
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 29 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.rpc.{RpcEndpointRef, RpcAddress, RpcEnv, ThreadSafeRpcEn
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{SparkExitCode, Utils}
import org.apache.spark.util.{ThreadUtils, SparkExitCode, Utils}

/**
* Proxy that relays messages to the driver.
Expand All @@ -41,7 +41,7 @@ private class ClientEndpoint(

// A scheduled executor used to send messages at the specified time.
private val forwardMessageThread =
Utils.newDaemonSingleThreadScheduledExecutor("client-forward-message")
ThreadUtils.newDaemonSingleThreadScheduledExecutor("client-forward-message")
// Used to provide the implicit parameter of `Future` methods.
private val forwardMessageExecutionContext =
ExecutionContext.fromExecutor(forwardMessageThread,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.rpc._
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
Expand Down Expand Up @@ -70,11 +70,11 @@ private[spark] class AppClient(
masterRpcAddresses.size, // Make sure we can register with all masters at the same time
60L, TimeUnit.SECONDS,
new SynchronousQueue[Runnable](),
Utils.namedThreadFactory("appclient-register-master-threadpool"))
ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))

// A scheduled executor for scheduling the registration actions
private val registrationRetryThread =
Utils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")

override def onStart(): Unit = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{SignalLogger, Utils}
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}

private[master] class Master(
override val rpcEnv: RpcEnv,
Expand All @@ -56,7 +56,7 @@ private[master] class Master(
extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {

private val forwardMessageThread =
Utils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")

// TODO Remove it once we don't use akka.serialization.Serialization
private val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{SignalLogger, Utils}
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}

private[worker] class Worker(
override val rpcEnv: RpcEnv,
Expand All @@ -59,9 +59,9 @@ private[worker] class Worker(

// A scheduled executor used to send messages at the specified time.
private val forwordMessageScheduler =
Utils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
// A separated thread to clean up the workDir
private val cleanupThread = Utils.newDaemonSingleThreadExecutor("worker-cleanup-thread")
private val cleanupThread = ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread")
// Used to provide the implicit parameter of `Future` methods.
private val cleanupThreadExecutor = ExecutionContext.fromExecutor(cleanupThread)

Expand Down Expand Up @@ -144,7 +144,7 @@ private[worker] class Worker(
masterRpcAddresses.size, // Make sure we can register with all masters at the same time
60L, TimeUnit.SECONDS,
new SynchronousQueue[Runnable](),
Utils.namedThreadFactory("worker-register-master-threadpool"))
ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))

var coresUsed = 0
var memoryUsed = 0
Expand Down
18 changes: 0 additions & 18 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -896,24 +896,6 @@ private[spark] object Utils extends Logging {
hostPortParseResults.get(hostPort)
}

/**
* Wrapper over newSingleThreadExecutor. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
*/
def newDaemonSingleThreadExecutor(prefix: String): ExecutorService = {
val threadFactory = namedThreadFactory(prefix)
Executors.newSingleThreadExecutor(threadFactory)
}

/**
* Wrapper over newSingleThreadScheduledExecutor. Thread names are formatted as prefix-ID, where
* ID is a unique, sequentially assigned integer.
*/
def newDaemonSingleThreadScheduledExecutor(prefix: String): ScheduledExecutorService = {
val threadFactory = namedThreadFactory(prefix)
Executors.newSingleThreadScheduledExecutor(threadFactory)
}

/**
* Return the string to tell how long has passed in milliseconds.
*/
Expand Down

0 comments on commit 25a84d8

Please sign in to comment.