|
@@ -56,19 +56,61 @@ object AsyncExecutor extends Logging { |
|
|
|
|
|
@volatile private[this] var executor: ThreadPoolExecutor = _ |
|
|
|
|
|
if (maxConnections > maxThreads) { |
|
|
// NOTE: when using transactions or DB locks, it may happen that a task has a lock on the database but no thread |
|
|
// to complete its action, while other tasks may have all the threads but are waiting for the first task to |
|
|
// complete. This creates a deadlock. |
|
|
logger.warn("Having maxConnection > maxThreads can result in deadlocks if transactions or database locks are used.") |
|
|
} |
|
|
|
|
|
lazy val executionContext = { |
|
|
if(!state.compareAndSet(0, 1)) |
|
|
throw new IllegalStateException("Cannot initialize ExecutionContext; AsyncExecutor already shut down") |
|
|
val queue: BlockingQueue[Runnable] = queueSize match { |
|
|
case 0 => new SynchronousQueue[Runnable] |
|
|
case -1 => new LinkedBlockingQueue[Runnable] |
|
|
case n => new ManagedArrayBlockingQueue(maxConnections, n).asInstanceOf[BlockingQueue[Runnable]] |
|
|
case 0 => |
|
|
// NOTE: SynchronousQueue does not schedule high-priority tasks before others and so it cannot be used when |
|
|
// the number of connections is limited (lest high-priority tasks may be holding all connections and low/mid |
|
|
// priority tasks all threads -- resulting in a deadlock). |
|
|
require(maxConnections == Integer.MAX_VALUE, "When using queueSize == 0 (direct hand-off), maxConnections must be Integer.MAX_VALUE.") |
|
|
|
|
|
new SynchronousQueue[Runnable] |
|
|
case -1 => |
|
|
// NOTE: LinkedBlockingQueue does not schedule high-priority tasks before others and so it cannot be used when |
|
|
// the number of connections is limited (lest high-priority tasks may be holding all connections and low/mid |
|
|
// priority tasks all threads -- resulting in a deadlock). |
|
|
require(maxConnections == Integer.MAX_VALUE, "When using queueSize == -1 (unlimited), maxConnections must be Integer.MAX_VALUE.") |
|
|
|
|
|
new LinkedBlockingQueue[Runnable] |
|
|
case n => |
|
|
// NOTE: The current implementation of ManagedArrayBlockingQueue is flawned. It makes the assumption that all |
|
|
// tasks go through the queue (which is responsible for scheduling high-priority tasks first). However, that |
|
|
// assumption is wrong since the ThreadPoolExecutor bypasses the queue when it creates new threads. This |
|
|
// happens when ever it creates a new thread to run a task, i.e. when minThreads < maxThreads and the number |
|
|
// of existing threads is < maxThreads. |
|
|
// |
|
|
// The only way to prevent problems is to have minThreads == maxThreads when using the |
|
|
// ManagedArrayBlockingQueue. |
|
|
require(minThreads == maxThreads, "When using queueSize > 0, minThreads == maxThreads is required.") |
|
|
|
|
|
// NOTE: The current implementation of ManagedArrayBlockingQueue.increaseInUseCount implicitly `require`s that |
|
|
// maxThreads <= maxConnections. |
|
|
require(maxThreads <= maxConnections, "When using queueSize > 0, maxThreads <= maxConnections is required.") |
|
|
|
|
|
// NOTE: Adding up the above rules |
|
|
// - maxThreads >= maxConnections, to prevent database locking issues when using transactions |
|
|
// - maxThreads <= maxConnections, required by ManagedArrayBlockingQueue |
|
|
// - maxThreads == minThreads, ManagedArrayBlockingQueue |
|
|
// |
|
|
// We have maxThreads == minThreads == maxConnections as the only working configuration |
|
|
|
|
|
new ManagedArrayBlockingQueue(maxConnections, n).asInstanceOf[BlockingQueue[Runnable]] |
|
|
} |
|
|
val tf = new DaemonThreadFactory(name + "-") |
|
|
executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime.toMillis, TimeUnit.MILLISECONDS, queue, tf) { |
|
|
|
|
|
/** If the runnable/task is a low/medium priority item, we increase the items in use count, because first thing it will do |
|
|
* is open a Jdbc connection from the pool. */ |
|
|
* is open a Jdbc connection from the pool. |
|
|
*/ |
|
|
override def beforeExecute(t: Thread, r: Runnable): Unit = { |
|
|
(r, queue) match { |
|
|
case (pr: PrioritizedRunnable, q: ManagedArrayBlockingQueue[Runnable]) if pr.priority != WithConnection => q.increaseInUseCount(pr) |
|
|