Skip to content
This repository has been archived by the owner on Oct 31, 2022. It is now read-only.

Commit

Permalink
Merge pull request #25 from samstarling/race-condition
Browse files Browse the repository at this point in the history
Fix race condition issue reported in #24
  • Loading branch information
samstarling committed Jun 22, 2018
2 parents 85161de + 95a61b2 commit 38ea5ba
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 21 deletions.
Expand Up @@ -36,40 +36,49 @@ class PrometheusStatsReceiver(registry: CollectorRegistry, namespace: String, ti

override def counter(verbosity: Verbosity, name: String*): Counter = {
val (metricName, labels) = extractLabels(name)
val c = counters
.getOrElseUpdate(metricName, this.synchronized { newCounter(metricName, labels.keys.toSeq) })
.labels(labels.values.toSeq: _*)
val counter = this.synchronized {
counters
.getOrElseUpdate(metricName, newCounter(metricName, labels.keys.toSeq))
.labels(labels.values.toSeq: _*)
}

new Counter {
override def incr(delta: Long): Unit = {
c.inc(delta)
counter.inc(delta)
}
}
}

override def stat(verbosity: Verbosity, name: String*): Stat = {
val (metricName, labels) = extractLabels(name)
val s = summaries
val summary = summaries
.getOrElseUpdate(metricName, this.synchronized { newSummary(metricName, labels.keys.toSeq) })
.labels(labels.values.toSeq: _*)

new Stat {
override def add(value: Float): Unit = {
s.observe(value)
summary.observe(value)
}
}
}

override def addGauge(verbosity: Verbosity, name: String*)(
f: => Float): Gauge = {
override def addGauge(verbosity: Verbosity, name: String*)(f: => Float): Gauge = {
val (metricName, labels) = extractLabels(name)
gauges
.getOrElseUpdate(metricName, this.synchronized { newGauge(metricName, labels.keys.toSeq) })
val labelValues = labels.values.toSeq
gaugeChilds
.getOrElseUpdate((metricName, labelValues), this.synchronized { gauges(metricName).labels(labelValues: _*) })
.set(f) // Set once initially

this.synchronized {
gauges
.getOrElseUpdate(metricName, newGauge(metricName, labels.keys.toSeq))
}

this.synchronized {
gaugeChilds
.getOrElseUpdate((metricName, labelValues), gauges(metricName).labels(labelValues: _*))
.set(f) // Set once initially
}

gaugeProviders.update((metricName, labelValues), () => f)

new Gauge {
override def remove(): Unit = gaugeProviders.remove((metricName, labelValues))
}
Expand Down
Expand Up @@ -10,11 +10,10 @@ class PrometheusStatsReceiverRaceTest extends UnitTest {

"PrometheusStatsReceiver#counters" should {


"handle creating and incrementing concurrently nicely" in {
val registry = new CollectorRegistry(true)
val statsReceiver = new PrometheusStatsReceiver(registry).scope("test")
val cf: Seq[Future[Unit]] = (1 to threadCount) map { n =>
val cf: Seq[Future[Unit]] = (1 to threadCount) map { _ =>
pool {
statsReceiver.counter("my_counter").incr(1)
}
Expand All @@ -39,19 +38,15 @@ class PrometheusStatsReceiverRaceTest extends UnitTest {
Await.result(joinedFutures, Duration(100, TimeUnit.MILLISECONDS))
registry.getSampleValue("finagle_my_counter", Array("serviceName"), Array("test")) === threadCount
}


}

"PrometheusStatsReceiver#counters#Gauges" should {


"reflect gauge value after creation" in {
val registry = new CollectorRegistry(true)
val mockTimer = new MockTimer
val statsReceiver = new PrometheusStatsReceiver(registry, "finagle", mockTimer, Duration(10, TimeUnit.SECONDS)).scope("test")

Time.withCurrentTimeFrozen { timeCtl =>
Time.withCurrentTimeFrozen { _ =>
var gaugeResult = 42
statsReceiver.addGauge("my_gauge") {
gaugeResult
Expand Down Expand Up @@ -98,6 +93,5 @@ class PrometheusStatsReceiverRaceTest extends UnitTest {
registry.getSampleValue("finagle_my_gauge", Array("serviceName"), Array("test")) === 8
}
}

}
}

0 comments on commit 38ea5ba

Please sign in to comment.