From e92f2cb69fb72f3cc6acd00a4aee49d46a75e7b2 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Wed, 22 Jul 2020 15:23:31 +0200 Subject: [PATCH] fix the module shutdown order, reporters will always get a chance to report the last bit of data before being stopped. Fixes #533 --- .../scala/kamon/module/ModuleRegistry.scala | 73 +++++++++++-------- 1 file changed, 41 insertions(+), 32 deletions(-) diff --git a/core/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala b/core/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala index 2fb363416..03ba5111f 100644 --- a/core/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala +++ b/core/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala @@ -24,8 +24,8 @@ import java.util.concurrent.atomic.AtomicReference import com.typesafe.config.Config import kamon.module.Module.Registration import kamon.status.Status -import kamon.metric.MetricRegistry -import kamon.trace.Tracer +import kamon.metric.{MetricRegistry, PeriodSnapshot} +import kamon.trace.{Span, Tracer} import kamon.util.Clock import org.slf4j.LoggerFactory @@ -128,8 +128,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry: */ def stopModules(): Future[Unit] = synchronized { implicit val cleanupExecutor = ExecutionContext.Implicits.global - scheduleMetricsTicker(once = true) - scheduleSpansTicker(once = true) + stopReporterTickers() var stoppedSignals: List[Future[Unit]] = Nil _registeredModules.dropWhile { @@ -151,7 +150,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry: * all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be * cancelled and scheduled again. */ - private def scheduleMetricsTicker(once: Boolean = false): Unit = { + private def scheduleMetricsTicker(): Unit = { val currentMetricsTicker = _metricsTickerSchedule.get() if(currentMetricsTicker != null) currentMetricsTicker.cancel(false) @@ -171,24 +170,14 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry: val currentInstant = Instant.now(clock) val periodSnapshot = metricRegistry.snapshot(resetState = true) - metricReporterModules().foreach { entry => - Future { - try entry.module.reportPeriodSnapshot(periodSnapshot) catch { case error: Throwable => - _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) - } - }(entry.executionContext) - } - + metricReporterModules().foreach(entry => scheduleMetricsTick(entry, periodSnapshot)) lastInstant = currentInstant } catch { case NonFatal(t) => _logger.error("Failed to run a metrics tick", t) } } - if(once) - _metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS) - else - _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS) + _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS) } } @@ -197,7 +186,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry: * all available span reporting modules. If a ticker was already scheduled then that scheduled job will be * cancelled and scheduled again. */ - private def scheduleSpansTicker(once: Boolean = false): Unit = { + private def scheduleSpansTicker(): Unit = { val currentSpansTicker = _spansTickerSchedule.get() if(currentSpansTicker != null) currentSpansTicker.cancel(false) @@ -208,27 +197,43 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry: val ticker = new Runnable { override def run(): Unit = try { val spanBatch = tracer.spans() - - spanReporterModules().foreach { entry => - Future { - try entry.module.reportSpans(spanBatch) catch { case error: Throwable => - _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error) - } - }(entry.executionContext) - } + spanReporterModules().foreach(entry => scheduleSpansBatch(entry, spanBatch)) } catch { case NonFatal(t) => _logger.error("Failed to run a spans tick", t) } } - if(once) - _spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS) - else - _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS) + _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS) } } + private def scheduleMetricsTick(entry: Entry[MetricReporter], periodSnapshot: PeriodSnapshot): Unit = { + Future { + try entry.module.reportPeriodSnapshot(periodSnapshot) catch { case error: Throwable => + _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) + } + }(entry.executionContext) + } + + private def scheduleSpansBatch(entry: Entry[SpanReporter], spanBatch: Seq[Span.Finished]): Unit = { + Future { + try entry.module.reportSpans(spanBatch) catch { case error: Throwable => + _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error) + } + }(entry.executionContext) + } + + private def stopReporterTickers(): Unit = { + val currentMetricsTicker = _metricsTickerSchedule.get() + if(currentMetricsTicker != null) + currentMetricsTicker.cancel(false) + + val currentSpansTicker = _spansTickerSchedule.get() + if(currentSpansTicker != null) + currentSpansTicker.cancel(false) + } + private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized { _metricReporterModules.values } @@ -368,10 +373,14 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry: // Remove the module from all registries _registeredModules = _registeredModules - entry.name - if(entry.module.isInstanceOf[MetricReporter]) + if(entry.module.isInstanceOf[MetricReporter]) { _metricReporterModules = _metricReporterModules - entry.name - if(entry.module.isInstanceOf[SpanReporter]) + scheduleMetricsTick(entry.asInstanceOf[Entry[MetricReporter]], metricRegistry.snapshot(resetState = false)) + } + if(entry.module.isInstanceOf[SpanReporter]) { _spanReporterModules = _spanReporterModules - entry.name + scheduleSpansBatch(entry.asInstanceOf[Entry[SpanReporter]], tracer.spans()) + } // Schedule a call to stop on the module