Skip to content

Commit

Permalink
More tags (#2157)
Browse files Browse the repository at this point in the history
* config(metrics): record task completion time

This changes the meaning of the task.invocations.duration metrics to
record only the time taken by this particular invocation, not the time
taken since the beginning of that execution.

In CompleteTaskHandler, we record the entire time it took for the
execution to complete.

In the example below showing the lifecycle of a task:

StartTask  RunTask    RunTask            RunTask         CompleteTask
 |----------|---|-------|-|--------------|-----|---------------|--> time
            ^-A-^       ^B^              ^--C--^
 ^-----------------------------D-------------------------------^

We would have:
* task.invocations.duration recording span A, B and C (3, 1 and 5 ticks respectively)
* task.completions.duration recording span D

Additionally, this also removes the counters, as we get them for free
with the timers (no need to double count)
  • Loading branch information
dreynaud committed Apr 19, 2018
1 parent c937e58 commit e40b7ac
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.netflix.spinnaker.orca.q.handler

import com.netflix.spectator.api.BasicTag
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.ExecutionStatus.*
import com.netflix.spinnaker.orca.events.TaskComplete
Expand All @@ -25,26 +27,30 @@ import com.netflix.spinnaker.orca.pipeline.model.Task
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor
import com.netflix.spinnaker.orca.q.*
import com.netflix.spinnaker.orca.q.metrics.MetricsTagHelper
import com.netflix.spinnaker.q.Queue
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Component
import java.time.Clock
import java.util.concurrent.TimeUnit

@Component
class CompleteTaskHandler(
override val queue: Queue,
override val repository: ExecutionRepository,
override val contextParameterProcessor: ContextParameterProcessor,
@Qualifier("queueEventPublisher") private val publisher: ApplicationEventPublisher,
private val clock: Clock
private val clock: Clock,
private val registry: Registry
) : OrcaMessageHandler<CompleteTask>, ExpressionAware {

override fun handle(message: CompleteTask) {
message.withTask { stage, task ->
task.status = message.status
task.endTime = clock.millis()
val mergedContextStage = stage.withMergedContext()
trackResult(stage, task, message.status)

if (message.status == REDIRECT) {
mergedContextStage.handleRedirect()
Expand Down Expand Up @@ -97,4 +103,19 @@ class CompleteTaskHandler(
queue.push(StartTask(execution.type, execution.id, execution.application, id, tasks[start].id))
}
}

private fun trackResult(stage: Stage, taskModel: com.netflix.spinnaker.orca.pipeline.model.Task, status: ExecutionStatus) {
val commonTags = MetricsTagHelper.commonTags(stage, taskModel, status)
val detailedTags = MetricsTagHelper.detailedTaskTags(stage, taskModel, status)

// we are looking at the time it took to complete the whole execution, not just one invocation
val elapsedMillis = clock.millis() - (taskModel.startTime ?: 0)

hashMapOf(
"task.completions.duration" to commonTags + BasicTag("application", stage.execution.application),
"task.completions.duration.withType" to commonTags + detailedTags
).forEach {
name, tags -> registry.timer(name, tags).record(elapsedMillis, TimeUnit.MILLISECONDS)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.netflix.spinnaker.orca.q.handler

import com.netflix.spectator.api.BasicTag
import com.netflix.spectator.api.Registry
import com.netflix.spectator.api.histogram.BucketCounter
import com.netflix.spinnaker.orca.*
import com.netflix.spinnaker.orca.ExecutionStatus.*
import com.netflix.spinnaker.orca.exceptions.ExceptionHandler
Expand All @@ -33,6 +33,7 @@ import com.netflix.spinnaker.orca.q.CompleteTask
import com.netflix.spinnaker.orca.q.InvalidTaskType
import com.netflix.spinnaker.orca.q.PauseTask
import com.netflix.spinnaker.orca.q.RunTask
import com.netflix.spinnaker.orca.q.metrics.MetricsTagHelper
import com.netflix.spinnaker.orca.time.toDuration
import com.netflix.spinnaker.orca.time.toInstant
import com.netflix.spinnaker.q.Message
Expand Down Expand Up @@ -62,6 +63,8 @@ class RunTaskHandler(
override fun handle(message: RunTask) {
message.withTask { stage, taskModel, task ->
val execution = stage.execution
val thisInvocationStartTimeMs = clock.millis()

try {
if (execution.isCanceled) {
task.onCancel(stage)
Expand All @@ -85,22 +88,22 @@ class RunTaskHandler(
when (result.status) {
RUNNING -> {
queue.push(message, task.backoffPeriod(taskModel, stage))
trackResult(stage, taskModel, result.status)
trackResult(stage, thisInvocationStartTimeMs, taskModel, result.status)
}
SUCCEEDED, REDIRECT, FAILED_CONTINUE -> {
queue.push(CompleteTask(message, result.status))
trackResult(stage, taskModel, result.status)
trackResult(stage, thisInvocationStartTimeMs, taskModel, result.status)
}
CANCELED -> {
task.onCancel(stage)
val status = stage.failureStatus(default = result.status)
queue.push(CompleteTask(message, status, result.status))
trackResult(stage, taskModel, status)
trackResult(stage, thisInvocationStartTimeMs, taskModel, status)
}
TERMINAL -> {
val status = stage.failureStatus(default = result.status)
queue.push(CompleteTask(message, status, result.status))
trackResult(stage, taskModel, status)
trackResult(stage, thisInvocationStartTimeMs, taskModel, status)
}
else ->
TODO("Unhandled task status ${result.status}")
Expand All @@ -113,51 +116,33 @@ class RunTaskHandler(
if (exceptionDetails?.shouldRetry == true) {
log.warn("Error running ${message.taskType.simpleName} for ${message.executionType}[${message.executionId}]")
queue.push(message, task.backoffPeriod(taskModel, stage))
trackResult(stage, taskModel, RUNNING)
trackResult(stage, thisInvocationStartTimeMs, taskModel, RUNNING)
} else if (e is TimeoutException && stage.context["markSuccessfulOnTimeout"] == true) {
queue.push(CompleteTask(message, SUCCEEDED))
} else {
log.error("Error running ${message.taskType.simpleName} for ${message.executionType}[${message.executionId}]", e)
stage.context["exception"] = exceptionDetails
repository.storeStage(stage)
queue.push(CompleteTask(message, stage.failureStatus()))
trackResult(stage, taskModel, stage.failureStatus())
trackResult(stage, thisInvocationStartTimeMs, taskModel, stage.failureStatus())
}
}
}
}

private fun trackResult(stage: Stage, taskModel: com.netflix.spinnaker.orca.pipeline.model.Task, status: ExecutionStatus) {
val id = registry.createId("task.invocations")
.withTag("status", status.toString())
.withTag("executionType", stage.execution.type.name.capitalize())
.withTag("taskType", taskModel.implementingClass)
.withTag("isComplete", status.isComplete.toString())
.withTag("application", stage.execution.application)
.let { id ->
stage.context["cloudProvider"]?.let {
id.withTag("cloudProvider", it.toString())
} ?: id
}
registry.counter(id).increment()
private fun trackResult(stage: Stage, thisInvocationStartTimeMs: Long, taskModel: com.netflix.spinnaker.orca.pipeline.model.Task, status: ExecutionStatus) {
val commonTags = MetricsTagHelper.commonTags(stage, taskModel, status)
val detailedTags = MetricsTagHelper.detailedTaskTags(stage, taskModel, status)

val distributionId = registry.createId("task.invocations.duration").withTags(id.tags())
BucketCounter
.get(registry, distributionId, { v -> bucketDuration(v) })
.record(System.currentTimeMillis() - (taskModel.startTime ?: 0))
}
val elapsedMillis = clock.millis() - thisInvocationStartTimeMs

fun bucketDuration(duration: Long): String {
return if (duration > TimeUnit.MINUTES.toMillis(60)) {
"gt60m"
} else if (duration > TimeUnit.MINUTES.toMillis(30)) {
"gt30m"
} else if (duration > TimeUnit.MINUTES.toMillis(15)) {
"gt15m"
} else if (duration > TimeUnit.MINUTES.toMillis(5)) {
"gt5m"
} else {
"lt5m"
hashMapOf(
"task.invocations.duration" to commonTags + BasicTag("application", stage.execution.application),
"task.invocations.duration.withType" to commonTags + detailedTags
).forEach {
name, tags ->
val id = registry.createId(name).withTags(tags)
registry.timer(name, tags).record(elapsedMillis, TimeUnit.MILLISECONDS)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.netflix.spinnaker.orca.q.metrics

import com.netflix.spectator.api.BasicTag
import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.pipeline.model.Stage

class MetricsTagHelper {
companion object {
fun commonTags(stage: Stage, taskModel: com.netflix.spinnaker.orca.pipeline.model.Task, status: ExecutionStatus): Iterable<BasicTag> =
arrayListOf(
BasicTag("status", status.toString()),
BasicTag("executionType", stage.execution.type.name.capitalize()),
BasicTag("isComplete", status.isComplete.toString()),
BasicTag("cloudProvider", stage.context["cloudProvider"].toString()?: "n_a"))

fun detailedTaskTags(stage: Stage, taskModel: com.netflix.spinnaker.orca.pipeline.model.Task, status: ExecutionStatus): Iterable<BasicTag> =
arrayListOf(
BasicTag("taskType", taskModel.implementingClass),
BasicTag("account", stage.context["account"].toString()?: "n_a"),
BasicTag("region", stage.context["region"].toString()?: "n_a"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.netflix.spinnaker.orca.q.handler

import com.netflix.spectator.api.NoopRegistry
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.orca.ExecutionStatus.*
import com.netflix.spinnaker.orca.events.TaskComplete
import com.netflix.spinnaker.orca.fixture.pipeline
Expand Down Expand Up @@ -45,7 +47,7 @@ object CompleteTaskHandlerTest : SubjectSpek<CompleteTaskHandler>({
val clock = fixedClock()

subject(GROUP) {
CompleteTaskHandler(queue, repository, ContextParameterProcessor(), publisher, clock)
CompleteTaskHandler(queue, repository, ContextParameterProcessor(), publisher, clock, NoopRegistry())
}

fun resetMocks() = reset(queue, repository, publisher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1184,21 +1184,4 @@ object RunTaskHandlerTest : SubjectSpek<RunTaskHandler>({
verify(queue).push(isA<InvalidTaskType>())
}
}

describe("should bucket task durations") {
mapOf(
0 to "lt5m",
1 to "lt5m",
7 to "gt5m",
16 to "gt15m",
31 to "gt30m",
61 to "gt60m",
120 to "gt60m"
).forEach { minutes, expectedBucket ->
given("a task that is ${minutes} minutes old") {
val millis = TimeUnit.MINUTES.toMillis(minutes.toLong())
assertThat(subject.bucketDuration(millis)).isEqualTo(expectedBucket)
}
}
}
})

0 comments on commit e40b7ac

Please sign in to comment.