/
AsyncExecutor.scala
375 lines (331 loc) · 16.6 KB
/
AsyncExecutor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
package slick.util
import java.io.Closeable
import java.lang.management.ManagementFactory
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.control.NonFatal
import slick.util.AsyncExecutor.Priority
import javax.management.{InstanceNotFoundException, ObjectName}
/** A connection pool for asynchronous execution of blocking I/O actions.
* This is used for the asynchronous query execution API on top of blocking back-ends like JDBC. */
trait AsyncExecutor extends Closeable {
/** An ExecutionContext for running Futures. */
def executionContext: ExecutionContext
/** Shut the thread pool down and try to stop running computations. The thread pool is
* transitioned into a state where it will not accept any new jobs. */
def close(): Unit
def prioritizedRunnable(priority: => Priority,
run: AsyncExecutor.PrioritizedRunnable.SetConnectionReleased => Unit
): AsyncExecutor.PrioritizedRunnable =
AsyncExecutor.PrioritizedRunnable(priority, run)
}
object AsyncExecutor extends Logging {
/** Create an [[AsyncExecutor]] with a thread pool suitable for blocking
* I/O. New threads are created as daemon threads.
*
* @param name A prefix to use for the names of the created threads.
* @param numThreads The number of threads in the pool.
* @param queueSize The size of the job queue, 0 for direct hand-off or -1 for unlimited size. */
def apply(name: String, numThreads: Int, queueSize: Int): AsyncExecutor =
apply(
name = name,
minThreads = numThreads,
maxThreads = numThreads,
queueSize = queueSize,
maxConnections = if (queueSize == -1) Int.MaxValue else numThreads,
keepAliveTime = 1.minute,
registerMbeans = false
)
/** Create an [[AsyncExecutor]] with a thread pool suitable for blocking
* I/O. New threads are created as daemon threads.
*
* @param name A prefix to use for the names of the created threads.
* @param minThreads The number of core threads in the pool.
* @param maxThreads The maximum number of threads in the pool.
* @param queueSize The size of the job queue, 0 for direct hand-off or -1 for unlimited size.
* @param maxConnections The maximum number of configured connections for the connection pool.
* The underlying ThreadPoolExecutor will not pick up any more work when all connections are in
* use. It will resume as soon as a connection is released again to the pool */
def apply(name: String, minThreads: Int, maxThreads: Int, queueSize: Int, maxConnections: Int): AsyncExecutor =
apply(
name,
minThreads = minThreads,
maxThreads = maxThreads,
queueSize = queueSize,
maxConnections = maxConnections,
keepAliveTime = 1.minute,
registerMbeans = false
)
/** Create an [[AsyncExecutor]] with a thread pool suitable for blocking
* I/O. New threads are created as daemon threads.
*
* @param name A prefix to use for the names of the created threads.
* @param minThreads The number of core threads in the pool.
* @param maxThreads The maximum number of threads in the pool.
* @param queueSize The size of the job queue, 0 for direct hand-off or -1 for unlimited size.
* @param maxConnections The maximum number of configured connections for the connection pool.
* The underlying ThreadPoolExecutor will not pick up any more work when all connections are in
* use. It will resume as soon as a connection is released again to the pool
* @param registerMbeans If set to true, register an MXBean that provides insight into the current
* queue and thread pool workload. */
def apply(name: String,
minThreads: Int,
maxThreads: Int,
queueSize: Int,
maxConnections: Int,
registerMbeans: Boolean): AsyncExecutor =
apply(
name,
minThreads = minThreads,
maxThreads = maxThreads,
queueSize = queueSize,
maxConnections = maxConnections,
keepAliveTime = 1.minute,
registerMbeans = registerMbeans
)
/** Create an [[AsyncExecutor]] with a thread pool suitable for blocking
* I/O. New threads are created as daemon threads.
*
* @param name A prefix to use for the names of the created threads.
* @param minThreads The number of core threads in the pool.
* @param maxThreads The maximum number of threads in the pool.
* @param queueSize The size of the job queue, 0 for direct hand-off or -1 for unlimited size.
* @param maxConnections The maximum number of configured connections for the connection pool.
* The underlying ThreadPoolExecutor will not pick up any more work when all connections are in
* use. It will resume as soon as a connection is released again to the pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param registerMbeans If set to true, register an MXBean that provides insight into the current
* queue and thread pool workload. */
def apply(name: String,
minThreads: Int,
maxThreads: Int,
queueSize: Int,
maxConnections: Int,
keepAliveTime: Duration,
registerMbeans: Boolean): AsyncExecutor = {
class AsyncExecutorImpl(queue: BlockingQueue[Runnable]) extends DefaultAsyncExecutor(
name = name,
minThreads = minThreads,
maxThreads = maxThreads,
queue = queue,
queueSize = queueSize,
maxConnections = maxConnections,
keepAliveTime = keepAliveTime,
registerMbeans = registerMbeans
)
queueSize match {
case 0 =>
// NOTE: SynchronousQueue does not schedule high-priority tasks before others and so it cannot be used when
// the number of connections is limited (lest high-priority tasks may be holding all connections and low/mid
// priority tasks all threads -- resulting in a deadlock).
require(
maxConnections == Int.MaxValue,
"When using queueSize == 0 (direct hand-off), maxConnections must be Int.MaxValue."
)
new AsyncExecutorImpl(new SynchronousQueue[Runnable])
case -1 =>
// NOTE: LinkedBlockingQueue does not schedule high-priority tasks before others and so it cannot be used when
// the number of connections is limited (lest high-priority tasks may be holding all connections and low/mid
// priority tasks all threads -- resulting in a deadlock).
require(
maxConnections == Int.MaxValue,
"When using queueSize == -1 (unlimited), maxConnections must be Int.MaxValue."
)
new AsyncExecutorImpl(new LinkedBlockingQueue[Runnable])
case n =>
// NOTE: The current implementation of ManagedArrayBlockingQueue is flawed. It makes the assumption that all
// tasks go through the queue (which is responsible for scheduling high-priority tasks first). However, that
// assumption is wrong since the ThreadPoolExecutor bypasses the queue when it creates new threads. This
// happens whenever it creates a new thread to run a task, i.e. when minThreads < maxThreads and the number
// of existing threads is < maxThreads.
//
// The only way to prevent problems is to have minThreads == maxThreads when using the
// ManagedArrayBlockingQueue.
require(minThreads == maxThreads, "When using queueSize > 0, minThreads == maxThreads is required.")
// NOTE: The current implementation of ManagedArrayBlockingQueue.increaseInUseCount implicitly `require`s that
// maxThreads <= maxConnections.
require(maxThreads <= maxConnections, "When using queueSize > 0, maxThreads <= maxConnections is required.")
// NOTE: Adding up the above rules
// - maxThreads >= maxConnections, to prevent database locking issues when using transactions
// - maxThreads <= maxConnections, required by ManagedArrayBlockingQueue
// - maxThreads == minThreads, ManagedArrayBlockingQueue
//
// We have maxThreads == minThreads == maxConnections as the only working configuration
val managedArrayBlockingQueue = new ManagedArrayBlockingQueue(maxConnections, n)
new AsyncExecutorImpl(managedArrayBlockingQueue.asInstanceOf[BlockingQueue[Runnable]]) {
override def prioritizedRunnable(priority0: => Priority,
run0: PrioritizedRunnable.SetConnectionReleased => Unit
): PrioritizedRunnable =
new PrioritizedRunnable {
override def priority() = priority0
private def runAndCleanUp() =
try run0(new PrioritizedRunnable.SetConnectionReleased(() => connectionReleased = true))
finally {
// If the runnable/task has released the Jdbc connection we decrease the counter again
if (!managedArrayBlockingQueue.attemptCleanUp(this))
logger.warn("After executing a task, the in-use count was already 0. This should not happen.")
}
override def run() =
if (priority() == WithConnection)
runAndCleanUp()
else if (managedArrayBlockingQueue.attemptPrepare(this))
runAndCleanUp()
else {
logger.warn("Could not increase in-use count. Will resubmit...")
executor.execute(this)
}
}
}
}
}
def default(name: String, maxConnections: Int): AsyncExecutor =
apply(
name,
minThreads = 20,
maxThreads = 20,
queueSize = 1000,
maxConnections = maxConnections
)
def default(name: String = "AsyncExecutor.default"): AsyncExecutor =
apply(name, 20, 1000)
private[slick] class DefaultAsyncExecutor(name: String,
minThreads: Int,
maxThreads: Int,
private[slick] val queue: BlockingQueue[Runnable],
queueSize: Int,
maxConnections: Int,
keepAliveTime: Duration,
registerMbeans: Boolean) extends AsyncExecutor {
@volatile private[this] lazy val mBeanName = new ObjectName(s"slick:type=AsyncExecutor,name=$name")
// Before init: 0, during init: 1, after init: 2, during/after shutdown: 3
private[this] val state = new AtomicInteger(0)
@volatile protected var executor: ThreadPoolExecutor = _
if (maxConnections > maxThreads) {
// NOTE: when using transactions or DB locks, it may happen that a task has a lock on the database but no thread
// to complete its action, while other tasks may have all the threads but are waiting for the first task to
// complete. This creates a deadlock.
logger.warn(
"Having maxConnection > maxThreads can result in deadlocks if transactions or database locks are used."
)
}
lazy val executionContext = {
if (!state.compareAndSet(0, 1))
throw new IllegalStateException("Cannot initialize ExecutionContext; AsyncExecutor already shut down")
val tf = new DaemonThreadFactory(name + "-")
executor =
new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime.toMillis, TimeUnit.MILLISECONDS, queue, tf)
if (registerMbeans) {
try {
val mBeanServer = ManagementFactory.getPlatformMBeanServer
if (mBeanServer.isRegistered(mBeanName))
logger.warn(s"MBean $mBeanName already registered (AsyncExecutor names should be unique)")
else {
logger.debug(s"Registering MBean $mBeanName")
mBeanServer.registerMBean(new AsyncExecutorMXBean {
def getMaxQueueSize = queueSize
def getQueueSize = queue.size()
def getMaxThreads = maxThreads
def getActiveThreads = executor.getActiveCount
}, mBeanName)
}
} catch { case NonFatal(ex) => logger.error("Error registering MBean", ex) }
}
if(!state.compareAndSet(1, 2)) {
unregisterMbeans()
executor.shutdownNow()
throw
new IllegalStateException("Cannot initialize ExecutionContext; AsyncExecutor shut down during initialization")
}
new ExecutionContextExecutor {
override def reportFailure(t: Throwable): Unit = loggingReporter(t)
override def execute(command: Runnable): Unit = {
if (command.isInstanceOf[PrioritizedRunnable]) {
executor.execute(command)
} else {
executor.execute(new PrioritizedRunnable {
override def priority(): Priority = WithConnection
override def run(): Unit = command.run()
})
}
}
}
}
private[this] def unregisterMbeans(): Unit = if(registerMbeans) {
try {
val mBeanServer = ManagementFactory.getPlatformMBeanServer
logger.debug(s"Unregistering MBean $mBeanName")
try mBeanServer.unregisterMBean(mBeanName) catch { case _: InstanceNotFoundException => }
} catch { case NonFatal(ex) => logger.error("Error unregistering MBean", ex) }
}
def close(): Unit = if(state.getAndSet(3) == 2) {
unregisterMbeans()
executor.shutdownNow()
if(!executor.awaitTermination(30, TimeUnit.SECONDS))
logger.warn("Abandoning ThreadPoolExecutor (not yet destroyed after 30 seconds)")
}
}
sealed trait Priority
/** Fresh is used for database actions that are scheduled/queued for the first time. */
case object Fresh extends Priority
/** Continuation is used for database actions that are a continuation of some previously executed actions */
case object Continuation extends Priority
/** WithConnection is used for database actions that already have a JDBC connection associated. */
case object WithConnection extends Priority
sealed trait PrioritizedRunnable extends Runnable {
def priority(): Priority
/** true if the JDBC connection was released */
var connectionReleased = false
/** true if the inUseCounter of the ManagedArrayBlockQueue was incremented */
var inUseCounterSet = false
}
object PrioritizedRunnable {
class SetConnectionReleased(val f: () => Unit) extends AnyVal {
def apply() = f()
}
def apply(priority: => Priority, run: SetConnectionReleased => Unit): PrioritizedRunnable = {
def _priority = priority
def _run(setConnectionReleased: SetConnectionReleased) = run(setConnectionReleased)
new PrioritizedRunnable {
def priority(): Priority = _priority
def run(): Unit = _run(new SetConnectionReleased(() => connectionReleased = true))
}
}
}
private class DaemonThreadFactory(namePrefix: String) extends ThreadFactory {
private[this] val group =
Option(System.getSecurityManager).fold(Thread.currentThread.getThreadGroup)(_.getThreadGroup)
private[this] val threadNumber = new AtomicInteger(1)
def newThread(r: Runnable): Thread = {
val t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement, 0)
if(!t.isDaemon) t.setDaemon(true)
if(t.getPriority != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY)
t
}
}
/** An Executor which spawns a new daemon thread for each command. It is useful for wrapping
* synchronous `close` calls for asynchronous `shutdown` operations. */
private[slick] val shutdownExecutor: Executor = { (command: Runnable) =>
val t = new Thread(command)
t.setName("shutdownExecutor")
t.setDaemon(true)
t.start()
}
private val loggingReporter: Throwable => Unit = (t: Throwable) => {
logger.warn("Execution of asynchronous I/O action failed", t)
}
}
/** The information that is exposed by an [[AsyncExecutor]] via JMX. */
trait AsyncExecutorMXBean {
/** Get the configured maximum queue size (0 for direct hand-off, -1 for unlimited) */
def getMaxQueueSize: Int
/** Get the current number of DBIOActions in the queue (waiting to be executed) */
def getQueueSize: Int
/** Get the configured maximum number of database I/O threads */
def getMaxThreads: Int
/** Get the number of database I/O threads that are currently executing a task */
def getActiveThreads: Int
}