Skip to content

Commit

Permalink
= core: ensure that MinMaxCounters never record values bellow zero, f…
Browse files Browse the repository at this point in the history
…ixes #71
  • Loading branch information
ivantopo committed Aug 13, 2014
1 parent 0777a6f commit d2a2d59
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 13 deletions.
6 changes: 3 additions & 3 deletions kamon-core/src/main/java/kamon/jsr166/LongMaxUpdater.java
Expand Up @@ -114,17 +114,17 @@ public void reset() {
*
* @return the maximum
*/
public long maxThenReset() {
public long maxThenReset(long newValue) {
Cell[] as = cells;
long max = base;
base = Long.MIN_VALUE;
base = newValue;
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
Cell a = as[i];
if (a != null) {
long v = a.value;
a.value = Long.MIN_VALUE;
a.value = newValue;
if (v > max)
max = v;
}
Expand Down
Expand Up @@ -33,6 +33,7 @@ trait MinMaxCounter extends MetricRecorder {
def increment(times: Long): Unit
def decrement()
def decrement(times: Long)
def refreshValues(): Unit
}

object MinMaxCounter {
Expand Down Expand Up @@ -95,19 +96,21 @@ class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter
def refreshValues(): Unit = {
val currentValue = {
val value = sum.get()
if (value < 0) 0 else value
if (value <= 0) 0 else value
}

val currentMin = {
val minAbs = abs(min.maxThenReset())
if (minAbs <= currentValue) minAbs else 0
val rawMin = min.maxThenReset(-currentValue)
if (rawMin >= 0)
0
else
abs(rawMin)
}

val currentMax = max.maxThenReset(currentValue)

underlyingHistogram.record(currentValue)
underlyingHistogram.record(currentMin)
underlyingHistogram.record(max.maxThenReset())

max.update(currentValue)
min.update(-currentValue)
underlyingHistogram.record(currentMax)
}
}
Expand Up @@ -17,14 +17,15 @@ package kamon.metric.instrument

import java.nio.LongBuffer

import akka.actor.ActorSystem
import akka.actor._
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import kamon.metric.CollectionContext
import kamon.metric.instrument.Histogram.MutableRecord
import org.scalatest.{ Matchers, WordSpecLike }

class MinMaxCounterSpec extends WordSpecLike with Matchers {
val system = ActorSystem("min-max-counter-spec")
implicit val system = ActorSystem("min-max-counter-spec")
val minMaxCounterConfig = ConfigFactory.parseString(
"""
|refresh-interval = 1 hour
Expand Down Expand Up @@ -83,7 +84,7 @@ class MinMaxCounterSpec extends WordSpecLike with Matchers {
MutableRecord(2, 3)) // min, max and current
}

"report zero as the min and current values if they current value fell bellow zero" in new MinMaxCounterFixture {
"report zero as the min and current values if the current value fell bellow zero" in new MinMaxCounterFixture {
mmCounter.decrement(3)

val snapshot = collectCounterSnapshot()
Expand All @@ -93,6 +94,22 @@ class MinMaxCounterSpec extends WordSpecLike with Matchers {
snapshot.recordsIterator.toStream should contain(
MutableRecord(0, 3)) // min, max and current (even while current really is -3
}

"never record values bellow zero in very busy situations" in new MinMaxCounterFixture {
val monitor = TestProbe()
val workers = for (workers 1 to 50) yield {
system.actorOf(Props(new MinMaxCounterUpdateActor(mmCounter, monitor.ref)))
}

workers foreach (_ ! "increment")
for (refresh 1 to 1000) {
collectCounterSnapshot()
Thread.sleep(10)
}

monitor.expectNoMsg()
workers foreach (_ ! PoisonPill)
}
}

trait MinMaxCounterFixture {
Expand All @@ -106,3 +123,20 @@ class MinMaxCounterSpec extends WordSpecLike with Matchers {
def collectCounterSnapshot(): Histogram.Snapshot = mmCounter.collect(collectionContext)
}
}

class MinMaxCounterUpdateActor(mmc: MinMaxCounter, monitor: ActorRef) extends Actor {
val x = Array.ofDim[Int](4)
def receive = {
case "increment"
mmc.increment()
self ! "decrement"
case "decrement"
mmc.decrement()
self ! "increment"
try {
mmc.refreshValues()
} catch {
case _: IndexOutOfBoundsException monitor ! "failed"
}
}
}

0 comments on commit d2a2d59

Please sign in to comment.