Skip to content

Commit

Permalink
refactor(metrics): Use an Atlas timer instead of gauges to track lag
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed Mar 1, 2018
1 parent bad0b55 commit e37e0ea
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 93 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ junitVersion=1.0.2
jupiterVersion=5.0.2
junitLegacyVersion=4.12.0
spekVersion=1.1.5
keikoVersion=2.5.0
keikoVersion=2.5.1
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import org.springframework.stereotype.Component
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.atomic.AtomicReference
import javax.annotation.PostConstruct
import kotlin.math.max

/**
* Monitors a queue and generates Atlas metrics.
Expand All @@ -52,9 +52,7 @@ class AtlasQueueMonitor
when (event) {
QueuePolled -> _lastQueuePoll.set(clock.instant())
is MessageProcessing -> {
_meanMessageLag.updateAndGet { it + event.lag.toMillis() }
_medianMessageLag.updateAndGet { it + event.lag.toMillis() }
_maxMessageLag.updateAndGet { max(it, event.lag.toMillis()) }
registry.timer("queue.message.lag").record(event.lag.toMillis(), MILLISECONDS)
}
is RetryPolled -> _lastRetryPoll.set(clock.instant())
is MessagePushed -> event.counter.increment()
Expand Down Expand Up @@ -101,21 +99,6 @@ class AtlasQueueMonitor
.toMillis()
.toDouble()
})
registry.gauge("queue.mean.lag", this, {
it.meanMessageLag
.toMillis()
.toDouble()
})
registry.gauge("queue.median.lag", this, {
it.medianMessageLag
.toMillis()
.toDouble()
})
registry.gauge("queue.max.lag", this, {
it.maxMessageLag
.toMillis()
.toDouble()
})
}

/**
Expand All @@ -136,26 +119,6 @@ class AtlasQueueMonitor
get() = _lastState.get()
private val _lastState = AtomicReference<QueueState>(QueueState(0, 0, 0))

val meanMessageLag: Duration
get() = _meanMessageLag
.getAndSet(emptyList())
.average()
.let { Duration.ofMillis(it.toLong()) }
private val _meanMessageLag = AtomicReference<List<Long>>(emptyList())

val medianMessageLag: Duration
get() = _medianMessageLag
.getAndSet(emptyList())
.median()
.let { Duration.ofMillis(it.toLong()) }
private val _medianMessageLag = AtomicReference<List<Long>>(emptyList())

val maxMessageLag: Duration
get() = _maxMessageLag
.getAndSet(0)
.let { Duration.ofMillis(it) }
private val _maxMessageLag = AtomicReference<Long>(0)

/**
* Count of messages pushed to the queue.
*/
Expand Down Expand Up @@ -214,10 +177,3 @@ class AtlasQueueMonitor
private val MessageNotFound.counter: Counter
get() = registry.counter("queue.message.notfound")
}

private fun List<Long>.median(): Double =
when {
isEmpty() -> Double.NaN
size % 2 == 1 -> sorted()[size / 2].toDouble()
else -> sorted().subList((size / 2) - 1, (size / 2) + 1).average()
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@ package com.netflix.spinnaker.orca.q.metrics

import com.netflix.spectator.api.Counter
import com.netflix.spectator.api.Registry
import com.netflix.spectator.api.Timer
import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE
import com.netflix.spinnaker.orca.q.StartExecution
import com.netflix.spinnaker.orca.time.fixedClock
import com.netflix.spinnaker.q.metrics.*
import com.nhaarman.mockito_kotlin.*
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.jetbrains.spek.api.lifecycle.CachingMode.GROUP
import org.jetbrains.spek.subject.SubjectSpek
import java.time.Duration
import java.time.Duration.ZERO
import java.time.Instant.now
import java.util.concurrent.TimeUnit.MILLISECONDS

object AtlasQueueMonitorTest : SubjectSpek<AtlasQueueMonitor>({

Expand All @@ -45,6 +45,7 @@ object AtlasQueueMonitorTest : SubjectSpek<AtlasQueueMonitor>({
val deadCounter: Counter = mock()
val duplicateCounter: Counter = mock()
val lockFailedCounter: Counter = mock()
val messageLagTimer: Timer = mock()

val registry: Registry = mock {
on { counter(eq("queue.pushed.messages"), anyVararg<String>()) } doReturn pushCounter
Expand All @@ -53,6 +54,7 @@ object AtlasQueueMonitorTest : SubjectSpek<AtlasQueueMonitor>({
on { counter("queue.dead.messages") } doReturn deadCounter
on { counter(eq("queue.duplicate.messages"), anyVararg<String>()) } doReturn duplicateCounter
on { counter("queue.lock.failed") } doReturn lockFailedCounter
on { timer("queue.message.lag") } doReturn messageLagTimer
}

subject(GROUP) {
Expand Down Expand Up @@ -98,50 +100,17 @@ object AtlasQueueMonitorTest : SubjectSpek<AtlasQueueMonitor>({
}
}

describe("message lag metrics") {
given("no messages have been processed") {
afterGroup(::resetMocks)

it("reports zero lag time") {
assertThat(subject.meanMessageLag).isEqualTo(ZERO)
assertThat(subject.medianMessageLag).isEqualTo(ZERO)
assertThat(subject.maxMessageLag).isEqualTo(ZERO)
}
}

given("some messages have been processed") {
afterGroup(::resetMocks)

val lag = sequenceOf(
Duration.ofSeconds(5),
Duration.ofSeconds(13),
Duration.ofSeconds(7)
)
val events = lag.mapIndexed { i, lag ->
MessageProcessing(StartExecution(PIPELINE, "$i", "covfefe"), lag)
}

on("receiving a ${events.first().javaClass.simpleName} event") {
events.forEach(subject::onQueueEvent)
}

it("averages the lag time") {
assertThat(subject.meanMessageLag).isEqualTo(lag.average())
// after reading the mean should reset
assertThat(subject.meanMessageLag).isEqualTo(ZERO)
}

it("records the median lag time") {
assertThat(subject.medianMessageLag).isEqualTo(lag.sorted().toList()[1])
// after reading the mean should reset
assertThat(subject.medianMessageLag).isEqualTo(ZERO)
}

it("records the max lag time") {
assertThat(subject.maxMessageLag).isEqualTo(lag.max())
// after reading the max should reset
assertThat(subject.maxMessageLag).isEqualTo(ZERO)
}
describe("when a message is processed") {
afterGroup(::resetMocks)

val event = MessageProcessing(StartExecution(PIPELINE, "1", "covfefe"), Duration.ofSeconds(5))

on("receiving a ${event.javaClass.simpleName} event") {
subject.onQueueEvent(event)
}

it("records the lag") {
verify(messageLagTimer).record(event.lag.toMillis(), MILLISECONDS)
}
}

Expand Down

0 comments on commit e37e0ea

Please sign in to comment.