-
Notifications
You must be signed in to change notification settings - Fork 3.1k
/
ThreadPoolFactory.scala
154 lines (125 loc) · 6.15 KB
/
ThreadPoolFactory.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/*
* Scala (https://www.scala-lang.org)
*
* Copyright EPFL and Lightbend, Inc.
*
* Licensed under Apache License 2.0
* (http://www.apache.org/licenses/LICENSE-2.0).
*
* See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
*/
package scala.tools.nsc.profile
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import scala.tools.nsc.{Global, Phase}
sealed trait ThreadPoolFactory {
def newUnboundedQueueFixedThreadPool(
nThreads: Int,
shortId: String,
priority: Int = Thread.NORM_PRIORITY): ThreadPoolExecutor
def newBoundedQueueFixedThreadPool(
nThreads: Int,
maxQueueSize: Int,
rejectHandler: RejectedExecutionHandler,
shortId: String,
priority: Int = Thread.NORM_PRIORITY): ThreadPoolExecutor
}
object ThreadPoolFactory {
def apply(global: Global, phase: Phase): ThreadPoolFactory = global.currentRun.profiler match {
case NoOpProfiler => new BasicThreadPoolFactory(phase)
case r: RealProfiler => new ProfilingThreadPoolFactory(phase, r)
}
private abstract class BaseThreadPoolFactory(phase: Phase) extends ThreadPoolFactory {
val baseGroup = new ThreadGroup(s"scalac-${phase.name}")
private def childGroup(name: String) = new ThreadGroup(baseGroup, name)
// Invoked when a new `Worker` is created, see `CommonThreadFactory.newThread`
protected def wrapWorker(worker: Runnable, shortId: String): Runnable = worker
protected final class CommonThreadFactory(
shortId: String,
daemon: Boolean = true,
priority: Int) extends ThreadFactory {
private val group: ThreadGroup = childGroup(shortId)
private val threadNumber: AtomicInteger = new AtomicInteger(1)
private val namePrefix = s"${baseGroup.getName}-$shortId-"
// Invoked by the `ThreadPoolExecutor` when creating a new worker thread. The argument
// runnable is the `Worker` (which extends `Runnable`). Its `run` method gets tasks from
// the thread pool and executes them (on the thread created here).
override def newThread(worker: Runnable): Thread = {
val wrapped = wrapWorker(worker, shortId)
val t: Thread = new Thread(group, wrapped, namePrefix + threadNumber.getAndIncrement, 0)
if (t.isDaemon != daemon) t.setDaemon(daemon)
if (t.getPriority != priority) t.setPriority(priority)
t
}
}
}
private final class BasicThreadPoolFactory(phase: Phase) extends BaseThreadPoolFactory(phase) {
override def newUnboundedQueueFixedThreadPool(nThreads: Int, shortId: String, priority: Int): ThreadPoolExecutor = {
val threadFactory = new CommonThreadFactory(shortId, priority = priority)
//like Executors.newFixedThreadPool
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory)
}
override def newBoundedQueueFixedThreadPool(nThreads: Int, maxQueueSize: Int, rejectHandler: RejectedExecutionHandler, shortId: String, priority: Int): ThreadPoolExecutor = {
val threadFactory = new CommonThreadFactory(shortId, priority = priority)
//like Executors.newFixedThreadPool
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](maxQueueSize), threadFactory, rejectHandler)
}
}
private class ProfilingThreadPoolFactory(phase: Phase, profiler: RealProfiler) extends BaseThreadPoolFactory(phase) {
override def newUnboundedQueueFixedThreadPool(nThreads: Int, shortId: String, priority: Int): ThreadPoolExecutor = {
val threadFactory = new CommonThreadFactory(shortId, priority = priority)
//like Executors.newFixedThreadPool
new SinglePhaseInstrumentedThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory, new AbortPolicy)
}
override def newBoundedQueueFixedThreadPool(nThreads: Int, maxQueueSize: Int, rejectHandler: RejectedExecutionHandler, shortId: String, priority: Int): ThreadPoolExecutor = {
val threadFactory = new CommonThreadFactory(shortId, priority = priority)
//like Executors.newFixedThreadPool
new SinglePhaseInstrumentedThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](maxQueueSize), threadFactory, rejectHandler)
}
override protected def wrapWorker(worker: Runnable, shortId: String): Runnable = () => {
val data = new ThreadProfileData
localData.set(data)
val profileStart = RealProfiler.snapThread(0)
try worker.run finally {
val snap = RealProfiler.snapThread(data.idleNs)
val threadRange = ProfileRange(profileStart, snap, phase, shortId, data.taskCount, Thread.currentThread())
profiler.completeBackground(threadRange)
}
}
/**
* data for thread run. Not threadsafe, only written from a single thread
*/
final class ThreadProfileData {
var firstStartNs = 0L
var taskCount = 0
var idleNs = 0L
var runningNs = 0L
var lastStartNs = 0L
var lastEndNs = 0L
}
val localData = new ThreadLocal[ThreadProfileData]
private class SinglePhaseInstrumentedThreadPoolExecutor(
corePoolSize: Int, maximumPoolSize: Int, keepAliveTime: Long, unit: TimeUnit,
workQueue: BlockingQueue[Runnable], threadFactory: ThreadFactory, handler: RejectedExecutionHandler)
extends ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler) {
override def beforeExecute(t: Thread, r: Runnable): Unit = {
val data = localData.get
data.taskCount += 1
val now = System.nanoTime()
if (data.firstStartNs == 0) data.firstStartNs = now
else data.idleNs += now - data.lastEndNs
data.lastStartNs = now
super.beforeExecute(t, r)
}
override def afterExecute(r: Runnable, t: Throwable): Unit = {
val now = System.nanoTime()
val data = localData.get
data.lastEndNs = now
data.runningNs += now - data.lastStartNs
super.afterExecute(r, t)
}
}
}
}