diff --git a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala index b6c484bfe1967..5332510e87caf 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala @@ -326,7 +326,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: private var blocksInRequestBitVector = new BitSet(totalBlocks) override def run() { - var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots) + var threadPool = Utils.newDaemonFixedThreadPool( + MultiTracker.MaxChatSlots, "Bit Torrent Chatter") while (hasBlocks.get < totalBlocks) { var numThreadsToCreate = 0 @@ -736,7 +737,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: private var setOfCompletedSources = Set[SourceInfo]() override def run() { - var threadPool = Utils.newDaemonCachedThreadPool() + var threadPool = Utils.newDaemonCachedThreadPool("Bit torrent guide multiple requests") var serverSocket: ServerSocket = null serverSocket = new ServerSocket(0) @@ -927,7 +928,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: class ServeMultipleRequests extends Thread with Logging { // Server at most MultiTracker.MaxChatSlots peers - var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots) + var threadPool = Utils.newDaemonFixedThreadPool( + MultiTracker.MaxChatSlots, "Bit torrent serve multiple requests") override def run() { var serverSocket = new ServerSocket(0) diff --git a/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala b/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala index 21ec94659e601..82ed64f190b4f 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala @@ -137,7 +137,7 @@ extends Logging { class TrackMultipleValues extends Thread with Logging { override def run() { - var threadPool = Utils.newDaemonCachedThreadPool() + var threadPool = Utils.newDaemonCachedThreadPool("Track multiple values") var serverSocket: ServerSocket = null serverSocket = new ServerSocket(DriverTrackerPort) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala index e6674d49a7226..51af80a35ebec 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala @@ -291,7 +291,7 @@ extends Broadcast[T](id) with Logging with Serializable { private var setOfCompletedSources = Set[SourceInfo]() override def run() { - var threadPool = Utils.newDaemonCachedThreadPool() + var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast guide multiple requests") var serverSocket: ServerSocket = null serverSocket = new ServerSocket(0) @@ -493,7 +493,7 @@ extends Broadcast[T](id) with Logging with Serializable { class ServeMultipleRequests extends Thread with Logging { - var threadPool = Utils.newDaemonCachedThreadPool() + var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast serve multiple requests") override def run() { var serverSocket = new ServerSocket(0) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 20323ea038fc2..032eb04f43f0a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -121,8 +121,7 @@ private[spark] class Executor( } // Start worker thread pool - val threadPool = new ThreadPoolExecutor( - 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable], Utils.daemonThreadFactory) + val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index e15a839c4e7a7..9c2fee4023be6 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -79,7 +79,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging { private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)] private val registerRequests = new SynchronizedQueue[SendingConnection] - implicit val futureExecContext = ExecutionContext.fromExecutor(Utils.newDaemonCachedThreadPool()) + implicit val futureExecContext = ExecutionContext.fromExecutor( + Utils.newDaemonCachedThreadPool("Connection manager future execution context")) private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala index feec8ecfe499b..4312c46cc190c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala @@ -24,33 +24,16 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult} import org.apache.spark.serializer.SerializerInstance +import org.apache.spark.util.Utils /** * Runs a thread pool that deserializes and remotely fetches (if necessary) task results. */ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler) extends Logging { - private val MIN_THREADS = System.getProperty("spark.resultGetter.minThreads", "4").toInt - private val MAX_THREADS = System.getProperty("spark.resultGetter.maxThreads", "4").toInt - private val getTaskResultExecutor = new ThreadPoolExecutor( - MIN_THREADS, - MAX_THREADS, - 0L, - TimeUnit.SECONDS, - new LinkedBlockingDeque[Runnable], - new ResultResolverThreadFactory) - - class ResultResolverThreadFactory extends ThreadFactory { - private var counter = 0 - private var PREFIX = "Result resolver thread" - - override def newThread(r: Runnable): Thread = { - val thread = new Thread(r, "%s-%s".format(PREFIX, counter)) - counter += 1 - thread.setDaemon(true) - return thread - } - } + private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt + private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool( + THREADS, "Result resolver thread") protected val serializer = new ThreadLocal[SerializerInstance] { override def initialValue(): SerializerInstance = { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f384875cc9af8..a3b3968c5e451 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -447,14 +447,17 @@ private[spark] object Utils extends Logging { hostPortParseResults.get(hostPort) } - private[spark] val daemonThreadFactory: ThreadFactory = - new ThreadFactoryBuilder().setDaemon(true).build() + private val daemonThreadFactoryBuilder: ThreadFactoryBuilder = + new ThreadFactoryBuilder().setDaemon(true) /** - * Wrapper over newCachedThreadPool. + * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a + * unique, sequentially assigned integer. */ - def newDaemonCachedThreadPool(): ThreadPoolExecutor = - Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor] + def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = { + val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build() + Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] + } /** * Return the string to tell how long has passed in seconds. The passing parameter should be in @@ -465,10 +468,13 @@ private[spark] object Utils extends Logging { } /** - * Wrapper over newFixedThreadPool. + * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a + * unique, sequentially assigned integer. */ - def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor = - Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor] + def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = { + val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build() + Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor] + } private def listFilesSafely(file: File): Seq[File] = { val files = file.listFiles()