Skip to content

Commit

Permalink
chore(kayenta): use new API for synthetic stages
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed Apr 6, 2018
1 parent 65bfafb commit e1202b1
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

@Component
public class WaitStage implements StageDefinitionBuilder {

public static String TYPE = "wait";

@Override
public void taskGraph(Stage stage, TaskNode.Builder builder) {
builder.withTask("wait", WaitTask.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ import com.netflix.spinnaker.orca.kayenta.model.KayentaCanaryContext
import com.netflix.spinnaker.orca.kayenta.model.RunCanaryContext
import com.netflix.spinnaker.orca.kayenta.tasks.AggregateCanaryResultsTask
import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilder
import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilder.newStage
import com.netflix.spinnaker.orca.pipeline.TaskNode.Builder
import com.netflix.spinnaker.orca.pipeline.WaitStage
import com.netflix.spinnaker.orca.pipeline.graph.StageGraphBuilder
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.model.SyntheticStageOwner.STAGE_BEFORE
import org.springframework.stereotype.Component
import java.time.Clock
import java.time.Duration
Expand All @@ -40,13 +39,9 @@ import java.time.Instant
import java.time.Instant.now
import java.time.temporal.ChronoUnit.MINUTES
import java.util.*
import java.util.Collections.singletonMap

@Component
class KayentaCanaryStage(
private val clock: Clock,
private val waitStage: WaitStage
) : StageDefinitionBuilder {
class KayentaCanaryStage(private val clock: Clock) : StageDefinitionBuilder {

private val mapper = OrcaObjectMapper
.newInstance()
Expand All @@ -56,8 +51,8 @@ class KayentaCanaryStage(
builder.withTask<AggregateCanaryResultsTask>("aggregateCanaryResults")
}

override fun aroundStages(stage: Stage): List<Stage> {
val canaryConfig = stage.mapTo<KayentaCanaryContext>("/canaryConfig")
override fun beforeStages(parent: Stage, graph: StageGraphBuilder) {
val canaryConfig = parent.mapTo<KayentaCanaryContext>("/canaryConfig")

if (canaryConfig.scopes.isEmpty()) {
throw IllegalArgumentException("Canary stage configuration must contain at least one scope.")
Expand All @@ -81,29 +76,33 @@ class KayentaCanaryStage(

val stages = ArrayList<Stage>()

if (canaryConfig.beginCanaryAnalysisAfter > ZERO) {
stages.add(newStage(
stage.execution,
waitStage.type,
"Warmup Wait",
singletonMap<String, Any>("waitTime", canaryConfig.beginCanaryAnalysisAfter.seconds),
stage,
STAGE_BEFORE
))
var previous: Stage? = if (canaryConfig.beginCanaryAnalysisAfter > ZERO) {
graph.add {
it.type = WaitStage.TYPE
it.name = "Warmup Wait"
it.context["waitTime"] = canaryConfig.beginCanaryAnalysisAfter.seconds
}
} else {
null
}

for (i in 1..numIntervals) {
// If an end time was explicitly specified, we don't need to synchronize
// the execution of the canary pipeline with the real time.
if (canaryConfig.endTime == null) {
stages.add(newStage(
stage.execution,
waitStage.type,
"Interval Wait #" + i,
singletonMap<String, Any>("waitTime", canaryAnalysisInterval.seconds),
stage,
STAGE_BEFORE
))
previous = if (previous == null) {
graph.add {
it.type = WaitStage.TYPE
it.name = "Interval Wait #$i"
it.context["waitTime"] = canaryAnalysisInterval.seconds
}
} else {
graph.connect(previous) {
it.type = WaitStage.TYPE
it.name = "Interval Wait #$i"
it.context["waitTime"] = canaryAnalysisInterval.seconds
}
}
}

val runCanaryContext = RunCanaryContext(
Expand All @@ -114,17 +113,20 @@ class KayentaCanaryStage(
canaryConfig.scoreThresholds
)

stages.add(newStage(
stage.execution,
RunCanaryPipelineStage.STAGE_TYPE,
"Run Canary #" + i,
mapper.convertValue<Map<String, Any>>(runCanaryContext),
stage,
STAGE_BEFORE
))
previous = if (previous == null) {
graph.add {
it.type = RunCanaryPipelineStage.STAGE_TYPE
it.name = "Run Canary #$i"
it.context.putAll(mapper.convertValue<Map<String, Any>>(runCanaryContext))
}
} else {
graph.connect(previous) {
it.type = RunCanaryPipelineStage.STAGE_TYPE
it.name = "Run Canary #$i"
it.context.putAll(mapper.convertValue<Map<String, Any>>(runCanaryContext))
}
}
}

return stages
}

fun buildRequestScopes(config: KayentaCanaryContext, interval: Int, intervalDuration: Duration): Map<String, CanaryScopes> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.netflix.spinnaker.orca.ext.mapTo
import com.netflix.spinnaker.orca.fixture.stage
import com.netflix.spinnaker.orca.kayenta.CanaryScope
import com.netflix.spinnaker.orca.pipeline.WaitStage
import com.netflix.spinnaker.orca.pipeline.graph.StageGraphBuilder
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.spek.and
import com.netflix.spinnaker.spek.values
Expand All @@ -36,9 +37,8 @@ import java.time.temporal.ChronoUnit.MINUTES

object KayentaCanaryStageTest : Spek({

val waitStage = WaitStage()
val clock = fixedClock()
val builder = KayentaCanaryStage(clock, waitStage)
val builder = KayentaCanaryStage(clock)

describe("planning a canary stage") {
given("start/end times are specified") {
Expand All @@ -65,7 +65,11 @@ object KayentaCanaryStageTest : Spek({
)
}

val aroundStages = builder.aroundStages(kayentaCanaryStage)
val aroundStages = StageGraphBuilder.beforeStages(kayentaCanaryStage)
.let { graph ->
builder.beforeStages(kayentaCanaryStage, graph)
graph.build()
}

it("should not introduce wait stages") {
assertThat(aroundStages).extracting("type").isEqualTo(expectedStageTypes)
Expand Down Expand Up @@ -110,7 +114,11 @@ object KayentaCanaryStageTest : Spek({
)
}

val aroundStages = builder.aroundStages(kayentaCanaryStage)
val aroundStages = StageGraphBuilder.beforeStages(kayentaCanaryStage)
.let { graph ->
builder.beforeStages(kayentaCanaryStage, graph)
graph.build()
}

it("still handles canary intervals properly") {
aroundStages
Expand Down Expand Up @@ -165,8 +173,12 @@ object KayentaCanaryStageTest : Spek({
)
}

val builder = KayentaCanaryStage(clock, waitStage)
val aroundStages = builder.aroundStages(kayentaCanaryStage)
val builder = KayentaCanaryStage(clock)
val aroundStages = StageGraphBuilder.beforeStages(kayentaCanaryStage)
.let { graph ->
builder.beforeStages(kayentaCanaryStage, graph)
graph.build()
}

it("should start now") {
assertThat(aroundStages).extracting("type").isEqualTo(expectedStageTypes)
Expand Down Expand Up @@ -231,7 +243,11 @@ object KayentaCanaryStageTest : Spek({
)
}

val aroundStages = builder.aroundStages(kayentaCanaryStage)
val aroundStages = StageGraphBuilder.beforeStages(kayentaCanaryStage)
.let { graph ->
builder.beforeStages(kayentaCanaryStage, graph)
graph.build()
}

if (warmupMins != null) {
val expectedWarmupWait = warmupMins.toInt()
Expand Down Expand Up @@ -306,7 +322,11 @@ object KayentaCanaryStageTest : Spek({
)
}

val aroundStages = builder.aroundStages(kayentaCanaryStage)
val aroundStages = StageGraphBuilder.beforeStages(kayentaCanaryStage)
.let { graph ->
builder.beforeStages(kayentaCanaryStage, graph)
graph.build()
}

it("propagates the additional attributes") {
assertThat(aroundStages.controlScopes())
Expand All @@ -328,7 +348,7 @@ data class CanaryRanges(
/**
* Get [scopeName] control scope from all [RunCanaryPipelineStage]s.
*/
fun List<Stage>.controlScopes(scopeName: String = "default"): List<CanaryScope> =
fun Iterable<Stage>.controlScopes(scopeName: String = "default"): List<CanaryScope> =
filter { it.type == RunCanaryPipelineStage.STAGE_TYPE }
.map { it.mapTo<CanaryScope>("/scopes/$scopeName/controlScope") }

Expand Down

0 comments on commit e1202b1

Please sign in to comment.