Skip to content

Commit

Permalink
= core: fixes #37
Browse files Browse the repository at this point in the history
  • Loading branch information
dpsoft committed Jun 6, 2014
1 parent d211435 commit c985fe9
Showing 1 changed file with 14 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package akka.instrumentation

import org.aspectj.lang.annotation._
import akka.dispatch.{ ExecutorServiceDelegate, Dispatcher, MessageDispatcher }
import kamon.metrics.{ Metrics, DispatcherMetrics }
import akka.dispatch.{ExecutorServiceDelegate, Dispatcher, MessageDispatcher}
import kamon.metrics.{Metrics, DispatcherMetrics}
import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder
import kamon.Kamon
import akka.actor.{ Cancellable, ActorSystemImpl }
import akka.actor.{Cancellable, ActorSystemImpl}
import scala.concurrent.forkjoin.ForkJoinPool
import java.util.concurrent.ThreadPoolExecutor
import java.lang.reflect.Method
Expand Down Expand Up @@ -82,9 +82,14 @@ class DispatcherTracing {
def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {
val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]

dispatcherWithMetrics.dispatcherCollectorCancellable.cancel()
Kamon(Metrics)(actorSystem).unregister(dispatcherWithMetrics.metricIdentity)
dispatcherWithMetrics.dispatcherMetricsRecorder.map {
dispatcher =>
dispatcherWithMetrics.dispatcherCollectorCancellable.cancel()
Kamon(Metrics)(actorSystem).unregister(dispatcherWithMetrics.metricIdentity)
}
}


}

@Aspect
Expand All @@ -108,6 +113,7 @@ object DispatcherMetricsCollector {
DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount,
(pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize)
}

private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = {
DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize)
}
Expand All @@ -124,13 +130,13 @@ object DispatcherMetricsCollector {
case x: Dispatcher {
val executor = executorServiceMethod.invoke(x) match {
case delegate: ExecutorServiceDelegate delegate.executor
case other other
case other other
}

executor match {
case fjp: ForkJoinPool collectForkJoinMetrics(fjp)
case fjp: ForkJoinPool collectForkJoinMetrics(fjp)
case tpe: ThreadPoolExecutor collectExecutorMetrics(tpe)
case anything DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
case anything DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
}
}
case _ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
Expand Down

0 comments on commit c985fe9

Please sign in to comment.