Skip to content

Commit

Permalink
Merge pull request #811 from ivantopo/issue-533/fix-shutdown-order
Browse files Browse the repository at this point in the history
fix the module shutdown order, fixes #533
  • Loading branch information
ivantopo authored Jul 22, 2020
2 parents 13ebbee + e92f2cb commit 05613bf
Showing 1 changed file with 41 additions and 32 deletions.
73 changes: 41 additions & 32 deletions core/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import java.util.concurrent.atomic.AtomicReference
import com.typesafe.config.Config
import kamon.module.Module.Registration
import kamon.status.Status
import kamon.metric.MetricRegistry
import kamon.trace.Tracer
import kamon.metric.{MetricRegistry, PeriodSnapshot}
import kamon.trace.{Span, Tracer}
import kamon.util.Clock
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -128,8 +128,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
*/
def stopModules(): Future[Unit] = synchronized {
implicit val cleanupExecutor = ExecutionContext.Implicits.global
scheduleMetricsTicker(once = true)
scheduleSpansTicker(once = true)
stopReporterTickers()

var stoppedSignals: List[Future[Unit]] = Nil
_registeredModules.dropWhile {
Expand All @@ -151,7 +150,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
* all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be
* cancelled and scheduled again.
*/
private def scheduleMetricsTicker(once: Boolean = false): Unit = {
private def scheduleMetricsTicker(): Unit = {
val currentMetricsTicker = _metricsTickerSchedule.get()
if(currentMetricsTicker != null)
currentMetricsTicker.cancel(false)
Expand All @@ -171,24 +170,14 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
val currentInstant = Instant.now(clock)
val periodSnapshot = metricRegistry.snapshot(resetState = true)

metricReporterModules().foreach { entry =>
Future {
try entry.module.reportPeriodSnapshot(periodSnapshot) catch { case error: Throwable =>
_logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
}
}(entry.executionContext)
}

metricReporterModules().foreach(entry => scheduleMetricsTick(entry, periodSnapshot))
lastInstant = currentInstant
} catch {
case NonFatal(t) => _logger.error("Failed to run a metrics tick", t)
}
}

if(once)
_metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
else
_metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS)
_metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS)
}
}

Expand All @@ -197,7 +186,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
* all available span reporting modules. If a ticker was already scheduled then that scheduled job will be
* cancelled and scheduled again.
*/
private def scheduleSpansTicker(once: Boolean = false): Unit = {
private def scheduleSpansTicker(): Unit = {
val currentSpansTicker = _spansTickerSchedule.get()
if(currentSpansTicker != null)
currentSpansTicker.cancel(false)
Expand All @@ -208,27 +197,43 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
val ticker = new Runnable {
override def run(): Unit = try {
val spanBatch = tracer.spans()

spanReporterModules().foreach { entry =>
Future {
try entry.module.reportSpans(spanBatch) catch { case error: Throwable =>
_logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error)
}
}(entry.executionContext)
}
spanReporterModules().foreach(entry => scheduleSpansBatch(entry, spanBatch))

} catch {
case NonFatal(t) => _logger.error("Failed to run a spans tick", t)
}
}

if(once)
_spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
else
_spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS)
_spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS)
}
}

private def scheduleMetricsTick(entry: Entry[MetricReporter], periodSnapshot: PeriodSnapshot): Unit = {
Future {
try entry.module.reportPeriodSnapshot(periodSnapshot) catch { case error: Throwable =>
_logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
}
}(entry.executionContext)
}

private def scheduleSpansBatch(entry: Entry[SpanReporter], spanBatch: Seq[Span.Finished]): Unit = {
Future {
try entry.module.reportSpans(spanBatch) catch { case error: Throwable =>
_logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error)
}
}(entry.executionContext)
}

private def stopReporterTickers(): Unit = {
val currentMetricsTicker = _metricsTickerSchedule.get()
if(currentMetricsTicker != null)
currentMetricsTicker.cancel(false)

val currentSpansTicker = _spansTickerSchedule.get()
if(currentSpansTicker != null)
currentSpansTicker.cancel(false)
}

private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized {
_metricReporterModules.values
}
Expand Down Expand Up @@ -368,10 +373,14 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:

// Remove the module from all registries
_registeredModules = _registeredModules - entry.name
if(entry.module.isInstanceOf[MetricReporter])
if(entry.module.isInstanceOf[MetricReporter]) {
_metricReporterModules = _metricReporterModules - entry.name
if(entry.module.isInstanceOf[SpanReporter])
scheduleMetricsTick(entry.asInstanceOf[Entry[MetricReporter]], metricRegistry.snapshot(resetState = false))
}
if(entry.module.isInstanceOf[SpanReporter]) {
_spanReporterModules = _spanReporterModules - entry.name
scheduleSpansBatch(entry.asInstanceOf[Entry[SpanReporter]], tracer.spans())
}


// Schedule a call to stop on the module
Expand Down

0 comments on commit 05613bf

Please sign in to comment.