Permalink
Browse files

Expose AsyncExecutor workload to monitoring via JMX

- Parse `registerMbeans` and `poolName` options in `JdbcBackend` and
  document that they apply to all pool implementations, not just
  HikariCP. The AsyncExecutor is now created with the configured
  `poolName` (instead of the config path) as its name.

- Register an implementation of the new `AsyncExecutorMXBean` interface
  as an MBean when creating an `AsyncExecutor` with `registerMbeans=true`.
  It is unregistered on `close()`. Any errors during registration or
  unregistration are logged as errors and ignored.

Fixes #1532. Test in MBeansTest.testMBeans.
  • Loading branch information...
szeiger committed Jun 30, 2016
1 parent 0a8a5d7 commit d3a933ceaff2c85979959f36abb211868ecfa838
@@ -84,3 +84,14 @@ h2mem1 = {
numThreads = 1
keepAliveConnection = true
}
// MBeans-enabled configuration, used by MBeansTest
mbeans = {
url = "jdbc:h2:mem:test_mbeans"
driver = org.h2.Driver
connectionPool = "HikariCP"
numThreads = 1
keepAliveConnection = true
registerMbeans = true
poolName = "myMbeansDatabase"
}
@@ -0,0 +1,60 @@
package slick.test.jdbc
import java.lang.management.ManagementFactory
import java.util.concurrent.TimeUnit
import javax.management.ObjectName
import org.junit.Assert._
import org.junit.Test
import slick.jdbc.H2Profile.api._
import scala.concurrent.{Future, Await}
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global
import scala.collection.JavaConverters._
class MBeansTest {
@Test
def testMBeans: Unit = {
val poolName = "myMbeansDatabase"
val db = Database.forConfig("mbeans")
try {
db.ioExecutionContext // force initialization of AsyncExecutor
val mbeanServer = ManagementFactory.getPlatformMBeanServer
val beanNames =
mbeanServer.queryNames(new ObjectName("slick:*"), null).asScala ++
mbeanServer.queryNames(new ObjectName("com.zaxxer.hikari:*"), null).asScala
println("Bean Names:")
beanNames.foreach(n => println(s" $n"))
val aeBeanName = new ObjectName(s"slick:type=AsyncExecutor,name=$poolName")
val poolBeanName = new ObjectName(s"com.zaxxer.hikari:type=Pool ($poolName)")
val poolConfigBeanName = new ObjectName(s"com.zaxxer.hikari:type=PoolConfig ($poolName)")
mbeanServer.getMBeanInfo(aeBeanName)
mbeanServer.getMBeanInfo(poolBeanName)
mbeanServer.getMBeanInfo(poolConfigBeanName)
Await.result(db.run(sqlu"""create alias sleep for "java.lang.Thread.sleep""""), Duration(10, TimeUnit.SECONDS))
assertEquals(1, mbeanServer.getAttribute(aeBeanName, "MaxThreads"))
mbeanServer.getAttribute(aeBeanName, "ActiveThreads") // we expect 0 but the returned number is only an estimate
assertEquals(1000, mbeanServer.getAttribute(aeBeanName, "MaxQueueSize"))
assertEquals(0, mbeanServer.getAttribute(aeBeanName, "QueueSize"))
val actions = Seq(
sql"select 1, sleep(1000)".as[Int],
sql"select 1".as[Int],
sql"select 1".as[Int],
sql"select 1".as[Int]
)
val fs = actions.map(db.run)
val size = mbeanServer.getAttribute(aeBeanName, "QueueSize")
// Usually we expect 3 but under high load it's possible that the first action is still in the queue
assertTrue(s"size should be 3 or 4, is $size", size == 3 || size == 4)
Await.result(Future.sequence(fs), Duration(10, TimeUnit.SECONDS))
} finally db.close()
}
}
@@ -139,6 +139,13 @@ trait JdbcBackend extends RelationalBackend {
* actions which cannot be executed immediately when all threads are busy. Beyond this
* limit new actions fail immediately. Set to 0 for no queue (direct hand-off) or to -1
* for an unlimited queue size (not recommended).</li>
* <li>`registerMbeans` (Boolean, optional, default: false): Whether or not JMX Management
* Beans ("MBeans") are registered. Slick supports an MBean of its own for monitoring the
* `AsyncExecutor` with the thread pool and queue, but connection pool implementations
* may register additional MBeans. In particular, HikariCP does this.</li>
* <li>`poolName` (String, optional): A user-defined name for the connection pool in logging
* and JMX management consoles to identify pools and pool configurations. This defaults to
* the config path.</li>
* </ul>
*
* The pool is tuned for asynchronous execution by default. Apart from the connection
@@ -190,9 +197,6 @@ trait JdbcBackend extends RelationalBackend {
* pool will "fail fast" if the pool cannot be seeded with initial connections
* successfully. If connections cannot be created at pool startup time, a RuntimeException
* will be thrown. This property has no effect if `minConnections` is 0.</li>
* <li>`poolName` (String, optional): A user-defined name for the connection pool in logging
* and JMX management consoles to identify pools and pool configurations. This defaults to
* the config path.</li>
* <li>`leakDetectionThreshold` (Duration, optional, default: 0): The amount of time that a
* connection can be out of the pool before a message is logged indicating a possible
* connection leak. A value of 0 means leak detection is disabled. Lowest acceptable value
@@ -202,8 +206,6 @@ trait JdbcBackend extends RelationalBackend {
* database is still alive. It is database dependent and should be a query that takes very
* little processing by the database (e.g. "VALUES 1"). When not set, the JDBC4
* `Connection.isValid()` method is used instead (which is usually preferable).</li>
* <li>`registerMbeans` (Boolean, optional, default: false): Whether or not JMX Management
* Beans ("MBeans") are registered.</li>
* </ul>
*
* Direct connections are based on a `java.sql.DataSource` or a `java.sql.Driver`. This is
@@ -269,9 +271,12 @@ trait JdbcBackend extends RelationalBackend {
classLoader: ClassLoader = ClassLoaderUtil.defaultClassLoader): Database = {
val usedConfig = if(path.isEmpty) config else config.getConfig(path)
val source = JdbcDataSource.forConfig(usedConfig, driver, path, classLoader)
val poolName = usedConfig.getStringOr("poolName", path)
val numThreads = usedConfig.getIntOr("numThreads", 20)
val maxConnections = source.maxConnections.fold(numThreads*5)(identity)
val executor = AsyncExecutor(path, numThreads, numThreads, usedConfig.getIntOr("queueSize", 1000), maxConnections)
val registerMbeans = usedConfig.getBooleanOr("registerMbeans", false)
val executor = AsyncExecutor(poolName, numThreads, numThreads, usedConfig.getIntOr("queueSize", 1000),
maxConnections, registerMbeans = registerMbeans)
forSource(source, executor)
}
}
@@ -1,12 +1,16 @@
package slick.util
import java.io.Closeable
import java.lang.management.ManagementFactory
import java.util.concurrent._
import javax.management.{InstanceNotFoundException, ObjectName}
import scala.concurrent.duration._
import scala.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import scala.util.control.NonFatal
/** A connection pool for asynchronous execution of blocking I/O actions.
* This is used for the asynchronous query execution API on top of blocking back-ends like JDBC. */
trait AsyncExecutor extends Closeable {
@@ -40,79 +44,103 @@ object AsyncExecutor extends Logging {
* Default is Integer.MAX_VALUE which is only ever a good choice when not using connection pooling
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.*/
def apply(name: String, minThreads: Int, maxThreads: Int, queueSize: Int, maxConnections: Int = Integer.MAX_VALUE, keepAliveTime: Duration = 1.minute): AsyncExecutor = {
new AsyncExecutor {
// Before init: 0, during init: 1, after init: 2, during/after shutdown: 3
private[this] val state = new AtomicInteger(0)
@volatile private[this] var executor: ThreadPoolExecutor = _
lazy val executionContext = {
if(!state.compareAndSet(0, 1))
throw new IllegalStateException("Cannot initialize ExecutionContext; AsyncExecutor already shut down")
val queue: BlockingQueue[Runnable] = queueSize match {
case 0 => new SynchronousQueue[Runnable]
case -1 => new LinkedBlockingQueue[Runnable]
case n => new ManagedArrayBlockingQueue(maxConnections, n).asInstanceOf[BlockingQueue[Runnable]]
}
val tf = new DaemonThreadFactory(name + "-")
executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime.toMillis, TimeUnit.MILLISECONDS, queue, tf) {
/**
* If the runnable/task is a low/medium priority item, we increase the items in use count, because first thing it will do
* is open a Jdbc connection from the pool.
*/
override def beforeExecute(t: Thread, r: Runnable): Unit = {
(r, queue) match {
case (pr: PrioritizedRunnable, q: ManagedArrayBlockingQueue[Runnable]) if pr.priority != WithConnection => q.increaseInUseCount(pr)
case _ =>
}
super.beforeExecute(t, r)
}
/**
* If the runnable/task has released the Jdbc connection we decrease the counter again
*/
override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
(r, queue) match {
case (pr: PrioritizedRunnable, q: ManagedArrayBlockingQueue[Runnable]) =>
if (pr.connectionReleased && pr.priority != WithConnection) q.decreaseInUseCount()
pr.inUseCounterSet = false
case _ =>
}
* will wait for new tasks before terminating.
* @param registerMbeans If set to true, register an MXBean that provides insight into the current
* queue and thread pool workload. */
def apply(name: String, minThreads: Int, maxThreads: Int, queueSize: Int, maxConnections: Int = Integer.MAX_VALUE, keepAliveTime: Duration = 1.minute,
registerMbeans: Boolean = false): AsyncExecutor = new AsyncExecutor {
@volatile private[this] lazy val mbeanName = new ObjectName(s"slick:type=AsyncExecutor,name=$name");
// Before init: 0, during init: 1, after init: 2, during/after shutdown: 3
private[this] val state = new AtomicInteger(0)
@volatile private[this] var executor: ThreadPoolExecutor = _
lazy val executionContext = {
if(!state.compareAndSet(0, 1))
throw new IllegalStateException("Cannot initialize ExecutionContext; AsyncExecutor already shut down")
val queue: BlockingQueue[Runnable] = queueSize match {
case 0 => new SynchronousQueue[Runnable]
case -1 => new LinkedBlockingQueue[Runnable]
case n => new ManagedArrayBlockingQueue(maxConnections, n).asInstanceOf[BlockingQueue[Runnable]]
}
val tf = new DaemonThreadFactory(name + "-")
executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime.toMillis, TimeUnit.MILLISECONDS, queue, tf) {
/** If the runnable/task is a low/medium priority item, we increase the items in use count, because first thing it will do
* is open a Jdbc connection from the pool. */
override def beforeExecute(t: Thread, r: Runnable): Unit = {
(r, queue) match {
case (pr: PrioritizedRunnable, q: ManagedArrayBlockingQueue[Runnable]) if pr.priority != WithConnection => q.increaseInUseCount(pr)
case _ =>
}
}
if(!state.compareAndSet(1, 2)) {
executor.shutdownNow()
throw new IllegalStateException("Cannot initialize ExecutionContext; AsyncExecutor shut down during initialization")
super.beforeExecute(t, r)
}
new ExecutionContextExecutor {
override def reportFailure(t: Throwable): Unit = loggingReporter(t)
override def execute(command: Runnable): Unit = {
if (command.isInstanceOf[PrioritizedRunnable]) {
executor.execute(command)
} else {
executor.execute(new PrioritizedRunnable {
override val priority: Priority = WithConnection
override def run(): Unit = command.run()
})
}
/**
* If the runnable/task has released the Jdbc connection we decrease the counter again
*/
override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
(r, queue) match {
case (pr: PrioritizedRunnable, q: ManagedArrayBlockingQueue[Runnable]) =>
if (pr.connectionReleased && pr.priority != WithConnection) q.decreaseInUseCount()
pr.inUseCounterSet = false
case _ =>
}
}
}
def close(): Unit = if(state.getAndSet(3) == 2) {
if(registerMbeans) {
try {
val mbeanServer = ManagementFactory.getPlatformMBeanServer
if(mbeanServer.isRegistered(mbeanName))
logger.warn(s"MBean $mbeanName already registered (AsyncExecutor names should be unique)")
else {
logger.debug(s"Registering MBean $mbeanName")
mbeanServer.registerMBean(new AsyncExecutorMXBean {
def getMaxQueueSize = queueSize
def getQueueSize = queue.size()
def getMaxThreads = maxThreads
def getActiveThreads = executor.getActiveCount
}, mbeanName)
}
} catch { case NonFatal(ex) => logger.error("Error registering MBean", ex) }
}
if(!state.compareAndSet(1, 2)) {
unregisterMbeans()
executor.shutdownNow()
if(!executor.awaitTermination(30, TimeUnit.SECONDS))
logger.warn("Abandoning ThreadPoolExecutor (not yet destroyed after 30 seconds)")
throw new IllegalStateException("Cannot initialize ExecutionContext; AsyncExecutor shut down during initialization")
}
new ExecutionContextExecutor {
override def reportFailure(t: Throwable): Unit = loggingReporter(t)
override def execute(command: Runnable): Unit = {
if (command.isInstanceOf[PrioritizedRunnable]) {
executor.execute(command)
} else {
executor.execute(new PrioritizedRunnable {
override val priority: Priority = WithConnection
override def run(): Unit = command.run()
})
}
}
}
}
private[this] def unregisterMbeans(): Unit = if(registerMbeans) {
try {
val mbeanServer = ManagementFactory.getPlatformMBeanServer
logger.debug(s"Unregistering MBean $mbeanName")
try mbeanServer.unregisterMBean(mbeanName) catch { case _: InstanceNotFoundException => }
} catch { case NonFatal(ex) => logger.error("Error unregistering MBean", ex) }
}
def close(): Unit = if(state.getAndSet(3) == 2) {
unregisterMbeans()
executor.shutdownNow()
if(!executor.awaitTermination(30, TimeUnit.SECONDS))
logger.warn("Abandoning ThreadPoolExecutor (not yet destroyed after 30 seconds)")
}
}
@@ -162,3 +190,15 @@ object AsyncExecutor extends Logging {
logger.warn("Execution of asynchronous I/O action failed", t)
}
}
/** The information that is exposed by an [[AsyncExecutor]] via JMX. */
trait AsyncExecutorMXBean {
/** Get the configured maximum queue size (0 for direct hand-off, -1 for unlimited) */
def getMaxQueueSize: Int
/** Get the current number of DBIOActions in the queue (waiting to be executed) */
def getQueueSize: Int
/** Get the configured maximum number of database I/O threads */
def getMaxThreads: Int
/** Get the number of database I/O threads that are currently executing a task */
def getActiveThreads: Int
}
@@ -56,3 +56,33 @@ The following loggers are particularly interesting:
``slick.jdbc.StatementInvoker.result``
Shows the first rows of the result set when a query is executed. Does not apply to streaming results.
.. index:: monitoring
Monitoring
==========
When a :doc:`Database <database>` object has the ``registerMbeans`` option enabled (see
:api:`Database.forConfig <slick.jdbc.JdbcBackend$DatabaseFactoryDef@forConfig(String,Config,Driver,ClassLoader):Database>`),
Slick registers a JMX_ management bean of type :api:`slick.util.AsyncExecutorMXBean` that provides information about the
current workload of the database I/O thread pool and the task queue.
Connection pool implementations may also honor this option and register additional management beans. In particular,
the default HikariCP_ pool implementation does this. See `HikariCP Monitoring`_ in the HikariCP documentation for
details.
The management bean names are qualified with the ``poolName`` from the database configuration, or the config path
if the ``poolName`` has not been set explicitly.
Example: Including the following configuration options in the database configuration
.. parsed-literal::
connectionPool = "HikariCP"
registerMbeans = true
poolName = "myDb"
results in these three management beans being registered:
- ``slick:type=AsyncExecutor,name=myDb``
- ``com.zaxxer.hikari:type=PoolConfig (myDb)``
- ``com.zaxxer.hikari:type=Pool (myDb)``
@@ -2,17 +2,16 @@
.. _SLF4J: http://www.slf4j.org/
.. _Logback: http://logback.qos.ch/
.. _JDBC: http://en.wikipedia.org/wiki/Java_Database_Connectivity
.. _Typesafe Subscription Agreement: http://typesafe.com/public/legal/TypesafeSubscriptionAgreement.pdf
.. _Typesafe Subscription: http://typesafe.com/subscription
.. _ScalaQuery: http://scalaquery.org
.. _Activator: https://typesafe.com/activator
.. _HikariCP: http://brettwooldridge.github.io/HikariCP/
.. _HikariCP Monitoring: https://github.com/brettwooldridge/HikariCP/wiki/MBean-(JMX)-Monitoring-and-Management
.. _About Pool Sizing: https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing
.. _Typesafe Config: https://github.com/typesafehub/config
.. _Play: https://playframework.com/
.. _Akka: http://akka.io/
.. _Akka Streams: http://akka.io/docs/
.. _Akka Sphinx: http://doc.akka.io/docs/akka/2.4.0/dev/documentation.html
.. _About Pool Sizing: https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing
.. _Reactive Streams: http://www.reactive-streams.org/
.. _JPA: http://en.wikipedia.org/wiki/Java_Persistence_API
.. _Reactive Manifesto: http://www.reactivemanifesto.org/
@@ -28,3 +27,4 @@
.. _SQLite: https://www.sqlite.org/
.. _Oracle: https://www.oracle.com/database/
.. _SQL Server: http://www.microsoft.com/en-us/server-cloud/products/sql-server/
.. _JMX: https://en.wikipedia.org/wiki/Java_Management_Extensions

0 comments on commit d3a933c

Please sign in to comment.