Skip to content

Commit

Permalink
= core: minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
dpsoft committed Jul 10, 2016
1 parent ef927dc commit b4bd658
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 30 deletions.
42 changes: 30 additions & 12 deletions kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
Expand Up @@ -21,15 +21,18 @@ import java.util.concurrent.ConcurrentLinkedQueue
import akka.event.LoggingAdapter
import kamon.Kamon
import kamon.metric.{ SegmentMetrics, TraceMetrics }
import kamon.trace.States.Status
import kamon.util.{ NanoInterval, RelativeNanoTimestamp }

import scala.annotation.tailrec
import scala.collection.concurrent.TrieMap

private[kamon] class MetricsOnlyContext(traceName: String, val token: String, traceTags: Map[String, String], currentStatus: Status, val levelOfDetail: LevelOfDetail,
val startTimestamp: RelativeNanoTimestamp, log: LoggingAdapter)
extends TraceContext {
private[kamon] class MetricsOnlyContext(traceName: String,
val token: String,
traceTags: Map[String, String],
currentStatus: Status,
val levelOfDetail: LevelOfDetail,
val startTimestamp: RelativeNanoTimestamp,
log: LoggingAdapter) extends TraceContext {

@volatile private var _name = traceName
@volatile private var _status = currentStatus
Expand All @@ -40,7 +43,7 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, tr
private val _tags = TrieMap.empty[String, String] ++= traceTags

def rename(newName: String): Unit =
if (States.Open == status)
if (Status.Open == status)
_name = newName
else
log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName)
Expand All @@ -53,7 +56,7 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, tr
def removeTag(key: String, value: String): Boolean = _tags.remove(key, value)

private def finish(withError: Boolean): Unit = {
_status = if (withError) States.FinishedWithError else States.FinishedSuccessfully
_status = if (withError) Status.FinishedWithError else Status.FinishedSuccessfully
val traceElapsedTime = NanoInterval.since(startTimestamp)
_elapsedTime = traceElapsedTime

Expand Down Expand Up @@ -95,7 +98,13 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, tr
}
}

protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval, segmentTags: Map[String, String], isFinishedWithError: Boolean): Unit = {
protected def finishSegment(segmentName: String,
category: String,
library: String,
duration: NanoInterval,
segmentTags: Map[String, String],
isFinishedWithError: Boolean): Unit = {

_finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration, segmentTags, isFinishedWithError))

if (isClosed) {
Expand All @@ -110,13 +119,17 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, tr
// will be returned.
def elapsedTime: NanoInterval = _elapsedTime

class MetricsOnlySegment(segmentName: String, val category: String, val library: String, segmentTags: Map[String, String]) extends Segment {
class MetricsOnlySegment(segmentName: String,
val category: String,
val library: String,
segmentTags: Map[String, String]) extends Segment {

private val _startTimestamp = RelativeNanoTimestamp.now
protected val _tags = TrieMap.empty[String, String] ++= segmentTags

@volatile private var _segmentName = segmentName
@volatile private var _elapsedTime = NanoInterval.default
@volatile private var _status: Status = States.Open
@volatile private var _status: Status = Status.Open

def name: String = _segmentName
def isEmpty: Boolean = false
Expand All @@ -126,13 +139,13 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, tr
def removeTag(key: String, value: String): Boolean = _tags.remove(key, value)

def rename(newName: String): Unit =
if (States.Open == status)
if (Status.Open == status)
_segmentName = newName
else
log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName)

private def finish(withError: Boolean): Unit = {
_status = if (withError) States.FinishedWithError else States.FinishedSuccessfully
_status = if (withError) Status.FinishedWithError else Status.FinishedSuccessfully
val segmentElapsedTime = NanoInterval.since(_startTimestamp)
_elapsedTime = segmentElapsedTime

Expand All @@ -154,4 +167,9 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, tr
}
}

case class SegmentLatencyData(name: String, category: String, library: String, duration: NanoInterval, tags: Map[String, String], isFinishedWithError: Boolean)
case class SegmentLatencyData(name: String,
category: String,
library: String,
duration: NanoInterval,
tags: Map[String, String],
isFinishedWithError: Boolean)
10 changes: 5 additions & 5 deletions kamon-core/src/main/scala/kamon/trace/TraceContext.scala
Expand Up @@ -19,7 +19,7 @@ package kamon.trace
import java.io.ObjectStreamException
import java.util

import kamon.trace.States.{ Closed, Status }
import kamon.trace.Status.Closed
import kamon.trace.TraceContextAware.DefaultTraceContextAware
import kamon.util.{ Function, RelativeNanoTimestamp, SameThreadExecutionContext, Supplier }

Expand All @@ -30,7 +30,7 @@ trait TraceContext {
def token: String
def isEmpty: Boolean
def nonEmpty: Boolean = !isEmpty
def isClosed: Boolean = !(States.Open == status)
def isClosed: Boolean = !(Status.Open == status)
def status: Status
def finish(): Unit
def finishWithError(cause: Throwable): Unit
Expand Down Expand Up @@ -88,7 +88,7 @@ trait Segment {
def library: String
def isEmpty: Boolean
def nonEmpty: Boolean = !isEmpty
def isClosed: Boolean = !(States.Open == status)
def isClosed: Boolean = !(Status.Open == status)
def status: Status
def finish(): Unit
def finishWithError(cause: Throwable): Unit
Expand Down Expand Up @@ -148,8 +148,8 @@ object LevelOfDetail {
case object FullTrace extends LevelOfDetail
}

object States {
sealed trait Status
sealed trait Status
object Status {
case object Open extends Status
case object Closed extends Status
case object FinishedWithError extends Status
Expand Down
5 changes: 2 additions & 3 deletions kamon-core/src/main/scala/kamon/trace/TracerModule.scala
Expand Up @@ -23,7 +23,6 @@ import akka.event.{ Logging, LoggingAdapter }
import com.typesafe.config.Config
import kamon.Kamon
import kamon.metric.MetricsModule
import kamon.trace.States.Status
import kamon.util._

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -140,7 +139,7 @@ private[kamon] class TracerModuleImpl(metricsExtension: MetricsModule, config: C
createTraceContext(name, token, tags, timestamp, status, isLocal)

private def createTraceContext(traceName: String, token: Option[String], tags: Map[String, String] = Map.empty, startTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now,
status: Status = States.Open, isLocal: Boolean = true): TraceContext = {
status: Status = Status.Open, isLocal: Boolean = true): TraceContext = {

def newMetricsOnlyContext(token: String): TraceContext =
new MetricsOnlyContext(traceName, token, tags, status, _settings.levelOfDetail, startTimestamp, _logger)
Expand All @@ -153,7 +152,7 @@ private[kamon] class TracerModuleImpl(metricsExtension: MetricsModule, config: C
case _ if !isLocal || !_settings.sampler.shouldTrace
newMetricsOnlyContext(traceToken)
case _
new TracingContext(traceName, traceToken, tags, currentStatus = States.Open, _settings.levelOfDetail, isLocal, startTimestamp, _logger, dispatchTracingContext)
new TracingContext(traceName, traceToken, tags, currentStatus = Status.Open, _settings.levelOfDetail, isLocal, startTimestamp, _logger, dispatchTracingContext)
}
}

Expand Down
21 changes: 15 additions & 6 deletions kamon-core/src/main/scala/kamon/trace/TracingContext.scala
Expand Up @@ -20,14 +20,19 @@ import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger

import akka.event.LoggingAdapter
import kamon.trace.States.Status
import kamon.util.{ NanoInterval, NanoTimestamp, RelativeNanoTimestamp }

import scala.collection.concurrent.TrieMap

private[trace] class TracingContext(traceName: String, token: String, tags: Map[String, String], currentStatus: Status, levelOfDetail: LevelOfDetail,
isLocal: Boolean, startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, traceInfoSink: TracingContext Unit)
extends MetricsOnlyContext(traceName, token, tags, currentStatus, levelOfDetail, startTimeztamp, log) {
private[trace] class TracingContext(traceName: String,
token: String,
tags: Map[String, String],
currentStatus: Status,
levelOfDetail: LevelOfDetail,
isLocal: Boolean,
startTimeztamp: RelativeNanoTimestamp,
log: LoggingAdapter,
traceInfoSink: TracingContext Unit) extends MetricsOnlyContext(traceName, token, tags, currentStatus, levelOfDetail, startTimeztamp, log) {

private val _openSegments = new AtomicInteger(0)
private val _startTimestamp = NanoTimestamp.now
Expand Down Expand Up @@ -57,7 +62,7 @@ private[trace] class TracingContext(traceName: String, token: String, tags: Map[
super.finishSegment(segmentName, category, library, duration, tags, isFinishedWithError)
}

def shouldIncubate: Boolean = (States.Open == status) || _openSegments.get() > 0
def shouldIncubate: Boolean = (Status.Open == status) || _openSegments.get() > 0

// Handle with care, should only be used after a trace is finished.
def generateTraceInfo: TraceInfo = {
Expand All @@ -77,7 +82,11 @@ private[trace] class TracingContext(traceName: String, token: String, tags: Map[
TraceInfo(name, token, _startTimestamp, elapsedTime, _metadata.toMap, segmentsInfo.result())
}

class TracingSegment(segmentName: String, category: String, library: String, tags: Map[String, String]) extends MetricsOnlySegment(segmentName, category, library, tags) {
class TracingSegment(segmentName: String,
category: String,
library: String,
tags: Map[String, String]) extends MetricsOnlySegment(segmentName, category, library, tags) {

private val metadata = TrieMap.empty[String, String]
override def addMetadata(key: String, value: String): Unit = metadata.put(key, value)

Expand Down
@@ -1,6 +1,6 @@
/*
* =========================================================================================
* Copyright © 2013-2014 the kamon project <http://kamon.io/>
* Copyright © 2013-2016 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
Expand All @@ -24,7 +24,6 @@ import org.slf4j.MDC
trait MdcKeysSupport {

val traceTokenKey = "traceToken"

val traceNameKey = "traceName"

private val defaultKeys = Seq(traceTokenKey, traceNameKey)
Expand All @@ -37,7 +36,7 @@ trait MdcKeysSupport {
// Java variant.
def withMdc[A](thunk: Supplier[A]): A = withMdc(thunk.get)

private[this] def copyToMdc(traceContext: TraceContext): Iterable[String] = traceContext match {
private[kamon] def copyToMdc(traceContext: TraceContext): Iterable[String] = traceContext match {
case ctx: MetricsOnlyContext

// Add the default key value pairs for the trace token and trace name.
Expand All @@ -46,7 +45,7 @@ trait MdcKeysSupport {

defaultKeys ++ ctx.traceLocalStorage.underlyingStorage.collect {
case (available: AvailableToMdc, value) Map(available.mdcKey -> String.valueOf(value))
}.map { value value.map { case (k, v) MDC.put(k, v); k } }.flatten
}.flatMap { value value.map { case (k, v) MDC.put(k, v); k } }

case EmptyTraceContext Iterable.empty[String]
}
Expand Down

0 comments on commit b4bd658

Please sign in to comment.