Skip to content

Commit

Permalink
Provide more options for the AsyncExecutor
Browse files Browse the repository at this point in the history
Allow the user to fine-tune the underlying thread pool of the AsyncExecutor
  • Loading branch information
bomgar committed Apr 26, 2016
1 parent 6c638ea commit f3626f3
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions slick/src/main/scala/slick/util/AsyncExecutor.scala
Expand Up @@ -3,6 +3,7 @@ package slick.util
import java.io.Closeable
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent._
import scala.concurrent.duration._
import scala.concurrent.{Promise, Future, ExecutionContext}
import scala.util.control.NonFatal

Expand All @@ -23,7 +24,19 @@ object AsyncExecutor extends Logging {
* @param name A prefix to use for the names of the created threads.
* @param numThreads The number of threads in the pool.
* @param queueSize The size of the job queue, 0 for direct hand-off or -1 for unlimited size. */
def apply(name: String, numThreads: Int, queueSize: Int): AsyncExecutor = {
def apply(name: String, numThreads: Int, queueSize: Int): AsyncExecutor = apply(name, numThreads, numThreads, queueSize)

/** Create an [[AsyncExecutor]] with a thread pool suitable for blocking
* I/O. New threads are created as daemon threads.
*
* @param name A prefix to use for the names of the created threads.
* @param minThreads The number of core threads in the pool.
* @param maxThreads The maximum number of threads in the pool.
* @param queueSize The size of the job queue, 0 for direct hand-off or -1 for unlimited size.
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.*/
def apply(name: String, minThreads: Int, maxThreads: Int, queueSize: Int, keepAliveTime: Duration = 1.minute): AsyncExecutor = {
new AsyncExecutor {
// Before init: 0, during init: 1, after init: 2, during/after shutdown: 3
private[this] val state = new AtomicInteger(0)
Expand All @@ -45,7 +58,7 @@ object AsyncExecutor extends Logging {
}
}
val tf = new DaemonThreadFactory(name + "-")
executor = new ThreadPoolExecutor(numThreads, numThreads, 1, TimeUnit.MINUTES, queue, tf)
executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime.toMillis, TimeUnit.MILLISECONDS, queue, tf)
if(!state.compareAndSet(1, 2)) {
executor.shutdownNow()
throw new IllegalStateException("Cannot initialize ExecutionContext; AsyncExecutor shut down during initialization")
Expand Down

0 comments on commit f3626f3

Please sign in to comment.