Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the module shutdown order, fixes #533 #811

Merged
merged 1 commit into from
Jul 22, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 41 additions & 32 deletions core/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import java.util.concurrent.atomic.AtomicReference
import com.typesafe.config.Config
import kamon.module.Module.Registration
import kamon.status.Status
import kamon.metric.MetricRegistry
import kamon.trace.Tracer
import kamon.metric.{MetricRegistry, PeriodSnapshot}
import kamon.trace.{Span, Tracer}
import kamon.util.Clock
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -128,8 +128,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
*/
def stopModules(): Future[Unit] = synchronized {
implicit val cleanupExecutor = ExecutionContext.Implicits.global
scheduleMetricsTicker(once = true)
scheduleSpansTicker(once = true)
stopReporterTickers()

var stoppedSignals: List[Future[Unit]] = Nil
_registeredModules.dropWhile {
Expand All @@ -151,7 +150,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
* all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be
* cancelled and scheduled again.
*/
private def scheduleMetricsTicker(once: Boolean = false): Unit = {
private def scheduleMetricsTicker(): Unit = {
val currentMetricsTicker = _metricsTickerSchedule.get()
if(currentMetricsTicker != null)
currentMetricsTicker.cancel(false)
Expand All @@ -171,24 +170,14 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
val currentInstant = Instant.now(clock)
val periodSnapshot = metricRegistry.snapshot(resetState = true)

metricReporterModules().foreach { entry =>
Future {
try entry.module.reportPeriodSnapshot(periodSnapshot) catch { case error: Throwable =>
_logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
}
}(entry.executionContext)
}

metricReporterModules().foreach(entry => scheduleMetricsTick(entry, periodSnapshot))
lastInstant = currentInstant
} catch {
case NonFatal(t) => _logger.error("Failed to run a metrics tick", t)
}
}

if(once)
_metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
else
_metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS)
_metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS)
}
}

Expand All @@ -197,7 +186,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
* all available span reporting modules. If a ticker was already scheduled then that scheduled job will be
* cancelled and scheduled again.
*/
private def scheduleSpansTicker(once: Boolean = false): Unit = {
private def scheduleSpansTicker(): Unit = {
val currentSpansTicker = _spansTickerSchedule.get()
if(currentSpansTicker != null)
currentSpansTicker.cancel(false)
Expand All @@ -208,27 +197,43 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
val ticker = new Runnable {
override def run(): Unit = try {
val spanBatch = tracer.spans()

spanReporterModules().foreach { entry =>
Future {
try entry.module.reportSpans(spanBatch) catch { case error: Throwable =>
_logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error)
}
}(entry.executionContext)
}
spanReporterModules().foreach(entry => scheduleSpansBatch(entry, spanBatch))

} catch {
case NonFatal(t) => _logger.error("Failed to run a spans tick", t)
}
}

if(once)
_spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
else
_spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS)
_spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS)
}
}

private def scheduleMetricsTick(entry: Entry[MetricReporter], periodSnapshot: PeriodSnapshot): Unit = {
Future {
try entry.module.reportPeriodSnapshot(periodSnapshot) catch { case error: Throwable =>
_logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
}
}(entry.executionContext)
}

private def scheduleSpansBatch(entry: Entry[SpanReporter], spanBatch: Seq[Span.Finished]): Unit = {
Future {
try entry.module.reportSpans(spanBatch) catch { case error: Throwable =>
_logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error)
}
}(entry.executionContext)
}

private def stopReporterTickers(): Unit = {
val currentMetricsTicker = _metricsTickerSchedule.get()
if(currentMetricsTicker != null)
currentMetricsTicker.cancel(false)

val currentSpansTicker = _spansTickerSchedule.get()
if(currentSpansTicker != null)
currentSpansTicker.cancel(false)
}

private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized {
_metricReporterModules.values
}
Expand Down Expand Up @@ -368,10 +373,14 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:

// Remove the module from all registries
_registeredModules = _registeredModules - entry.name
if(entry.module.isInstanceOf[MetricReporter])
if(entry.module.isInstanceOf[MetricReporter]) {
_metricReporterModules = _metricReporterModules - entry.name
if(entry.module.isInstanceOf[SpanReporter])
scheduleMetricsTick(entry.asInstanceOf[Entry[MetricReporter]], metricRegistry.snapshot(resetState = false))
}
if(entry.module.isInstanceOf[SpanReporter]) {
_spanReporterModules = _spanReporterModules - entry.name
scheduleSpansBatch(entry.asInstanceOf[Entry[SpanReporter]], tracer.spans())
}


// Schedule a call to stop on the module
Expand Down