Skip to content

Commit

Permalink
Merge pull request #1026 from ivantopo/delay-span-reporting
Browse files Browse the repository at this point in the history
introduce Span Reporting delay
  • Loading branch information
SimunKaracic committed May 27, 2021
2 parents 3734b8d + bb7f0f1 commit 1a75ab2
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/* =========================================================================================
* Copyright © 2013-2017 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/

package kamon.trace

import kamon.Kamon
import kamon.testkit.{Reconfigure, SpanInspection, TestSpanReporter}
import org.scalactic.TimesOnInt.convertIntToRepeater
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar
import org.scalatest.{Matchers, OptionValues, WordSpec}


class SpanReportingDelaySpec extends WordSpec with Matchers with OptionValues with SpanInspection.Syntax with Eventually
with SpanSugar with TestSpanReporter with Reconfigure {

"the Kamon tracer" when {
"has span reporting delay disabled" should {
"keep spans with a positive sampling decision" in {
val span = Kamon.spanBuilder("positive-span-without-delay").start()
span.trace.keep()
span.finish()

eventually(timeout(5 seconds)) {
val reportedSpan = testSpanReporter().nextSpan().value
reportedSpan.operationName shouldBe span.operationName()
}
}

"not report spans with a negative sampling decision" in {
val span = Kamon.spanBuilder("negative-span-without-delay").start()
span.trace.drop()
span.finish()
span.trace.keep() // Should not have any effect

5 times {
val allSpans = testSpanReporter().spans()
allSpans.find(_.operationName == span.operationName()) shouldBe empty

Thread.sleep(100) // Should be enough because Spans are reported every millisecond in tests
}
}
}

"has span reporting delay enabled" should {
"keep spans with a positive sampling decision" in {
applyConfig("kamon.trace.span-reporting-delay = 2 seconds")
val span = Kamon.spanBuilder("overwrite-to-positive-with-delay").start()
span.trace.drop()
span.finish()
span.trace.keep() // Should force the Span to be reported, even though it was dropped before finising

eventually(timeout(5 seconds)) {
val reportedSpan = testSpanReporter().nextSpan().value
reportedSpan.operationName shouldBe span.operationName()
}
}

"not report spans with a negative sampling decision" in {
val span = Kamon.spanBuilder("negative-span-without-delay").start()
span.trace.keep()
span.finish()
span.trace.drop() // Should force the Span to be dropped, even though it was sampled before finishing

5 times {
val allSpans = testSpanReporter().spans()
allSpans.find(_.operationName == span.operationName()) shouldBe empty

Thread.sleep(100) // Should be enough because Spans are reported every millisecond in tests
}
}
}
}
}
10 changes: 10 additions & 0 deletions core/kamon-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ kamon {
# reporter has a separate queue.
reporter-queue-size = 4096

# Decide for how long should the tracer wait to report Spans after they are finished. This setting helps keeping
# Spans in memory for a few seconds after they finished, in case users want to manually override the sampling
# decision for the trace later on. Keep in mind that:
# - Overriding the sampling decision is a local-only action, any Spans from other connected services will not be
# retained or affected in any way.
# - This will introduce additional memory consumption proportional to the reporting delay and the Spans volume.
#
# Setting this value to "0 seconds" disables the reporting delay.
span-reporting-delay = 0 seconds

# Decide whether a new, locally created Span should have the same Span Identifier as it's remote parent (if any) or
# get a new local identifier. Certain tracing systems use the same Span Identifier to represent both sides (client
# and server) of a RPC call, if you are reporting data to such systems then this option should be enabled. This
Expand Down
41 changes: 29 additions & 12 deletions core/kamon-core/src/main/scala/kamon/trace/Span.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package kamon
package trace

import java.time.{Duration, Instant}

import kamon.context.Context
import kamon.tag.TagSet
import kamon.trace.Span.Link
import kamon.trace.Trace.SamplingDecision
import kamon.util.Clock
import org.slf4j.LoggerFactory

import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import scala.compat.Platform.EOL

/**
Expand Down Expand Up @@ -410,7 +410,7 @@ object Span {
val kind: Kind, localParent: Option[Span], initialOperationName: String, spanTags: TagSet.Builder, metricTags: TagSet.Builder,
createdAt: Instant, initialMarks: List[Mark], initialLinks: List[Link], initialTrackMetrics: Boolean, tagWithParentOperation: Boolean,
includeErrorStacktrace: Boolean, isDelayed: Boolean, clock: Clock, preFinishHooks: Array[Tracer.PreFinishHook],
onFinish: Span.Finished => Unit, sampler: Sampler) extends Span.Delayed {
onFinish: Span.Finished => Unit, sampler: Sampler, scheduler: ScheduledExecutorService, reportingDelay: Duration) extends Span.Delayed {

private val _metricTags = metricTags
private val _spanTags = spanTags
Expand Down Expand Up @@ -440,25 +440,25 @@ object Span {
}

override def tag(key: String, value: String): Span = synchronized {
if(isSampled && _isOpen)
if((isSampled || !reportingDelay.isZero) && _isOpen)
_spanTags.add(key, value)
this
}

override def tag(key: String, value: Long): Span = synchronized {
if(isSampled && _isOpen)
if((isSampled || !reportingDelay.isZero) && _isOpen)
_spanTags.add(key, value)
this
}

override def tag(key: String, value: Boolean): Span = synchronized {
if(isSampled && _isOpen)
if((isSampled || !reportingDelay.isZero) && _isOpen)
_spanTags.add(key, value)
this
}

override def tag(tags: TagSet): Span = synchronized {
if(isSampled && _isOpen)
if((isSampled || !reportingDelay.isZero) && _isOpen)
_spanTags.add(tags)
this
}
Expand Down Expand Up @@ -507,7 +507,7 @@ object Span {
if(_isOpen) {
_hasError = true

if(isSampled)
if((isSampled || !reportingDelay.isZero))
_spanTags.add(TagKeys.ErrorMessage, message)
}
this
Expand All @@ -517,7 +517,7 @@ object Span {
if(_isOpen) {
_hasError = true

if(isSampled) {
if((isSampled || !reportingDelay.isZero)) {
_spanTags.add(TagKeys.ErrorMessage, throwable.getMessage)

if(includeErrorStacktrace)
Expand All @@ -531,7 +531,7 @@ object Span {
if(_isOpen) {
_hasError = true

if(isSampled) {
if((isSampled || !reportingDelay.isZero)) {
_spanTags.add(TagKeys.ErrorMessage, message)

if(includeErrorStacktrace)
Expand Down Expand Up @@ -617,7 +617,7 @@ object Span {
private def toStackTraceString(throwable: Throwable): String =
throwable.getStackTrace().mkString("", EOL, EOL)

private def toFinishedSpan(to: Instant, metricTags: TagSet): Span.Finished =
protected def toFinishedSpan(to: Instant, metricTags: TagSet): Span.Finished =
Span.Finished(id, trace, parentId, _operationName, _hasError, isDelayed, createdAt, to, kind, position, _spanTags.build(),
metricTags, _marks, _links)

Expand All @@ -643,10 +643,20 @@ object Span {
}

private def reportSpan(finishedAt: Instant, metricTags: TagSet): Unit = {
if(isSampled)
onFinish(toFinishedSpan(finishedAt, metricTags))
if(reportingDelay.isZero) {
if(isSampled)
onFinish(toFinishedSpan(finishedAt, metricTags))
}
else {
scheduler.schedule(
new DelayedReportingRunnable(finishedAt, metricTags),
reportingDelay.toMillis,
TimeUnit.MILLISECONDS
)
}
}


private def createMetricTags(): TagSet = {
_metricTags.add(TagKeys.OperationName, _operationName)
_metricTags.add(TagKeys.Error, _hasError)
Expand All @@ -662,6 +672,13 @@ object Span {

_metricTags.build()
}

private class DelayedReportingRunnable(finishedAt: Instant, metricTags: TagSet) extends Runnable {
override def run(): Unit = {
if (isSampled)
onFinish(toFinishedSpan(finishedAt, metricTags))
}
}
}

object Local {
Expand Down
9 changes: 6 additions & 3 deletions core/kamon-core/src/main/scala/kamon/trace/Tracer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
package kamon
package trace

import java.time.Instant
import java.time.{Duration, Instant}
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit}

import com.typesafe.config.Config
import kamon.context.Context
import kamon.tag.TagSet
Expand Down Expand Up @@ -53,6 +52,7 @@ class Tracer(initialConfig: Config, clock: Clock, contextStorage: ContextStorage
@volatile private var _adaptiveSamplerSchedule: Option[ScheduledFuture[_]] = None
@volatile private var _preStartHooks: Array[Tracer.PreStartHook] = Array.empty
@volatile private var _preFinishHooks: Array[Tracer.PreFinishHook] = Array.empty
@volatile private var _delayedSpanReportingDelay: Duration = Duration.ZERO
private val _onSpanFinish: Span.Finished => Unit = _spanBuffer.offer

reconfigure(initialConfig)
Expand Down Expand Up @@ -340,7 +340,8 @@ class Tracer(initialConfig: Config, clock: Clock, contextStorage: ContextStorage
}

new Span.Local(id, parentId, trace, position, _kind, localParent, _name, _spanTags, _metricTags, at, _marks, _links,
_trackMetrics, _tagWithParentOperation, _includeErrorStacktrace, isDelayed, clock, _preFinishHooks, _onSpanFinish, _sampler)
_trackMetrics, _tagWithParentOperation, _includeErrorStacktrace, isDelayed, clock, _preFinishHooks, _onSpanFinish,
_sampler, scheduler, _delayedSpanReportingDelay)
}

private def suggestedOrSamplerDecision(): SamplingDecision =
Expand Down Expand Up @@ -408,6 +409,7 @@ class Tracer(initialConfig: Config, clock: Clock, contextStorage: ContextStorage
val tagWithUpstreamService = traceConfig.getBoolean("span-metric-tags.upstream-service")
val tagWithParentOperation = traceConfig.getBoolean("span-metric-tags.parent-operation")
val includeErrorStacktrace = traceConfig.getBoolean("include-error-stacktrace")
val delayedSpanReportingDelay = traceConfig.getDuration("span-reporting-delay")

if(_traceReporterQueueSize != traceReporterQueueSize) {
// By simply changing the buffer we might be dropping Spans that have not been collected yet by the reporters.
Expand All @@ -424,6 +426,7 @@ class Tracer(initialConfig: Config, clock: Clock, contextStorage: ContextStorage
_tagWithUpstreamService = tagWithUpstreamService
_tagWithParentOperation = tagWithParentOperation
_traceReporterQueueSize = traceReporterQueueSize
_delayedSpanReportingDelay = delayedSpanReportingDelay
_preStartHooks = preStartHooks
_preFinishHooks = preFinishHooks

Expand Down

0 comments on commit 1a75ab2

Please sign in to comment.