Skip to content

Commit

Permalink
feat(execution): Adding start time TTLs to executions and stages
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert authored and robfletcher committed Mar 26, 2018
1 parent 104cdb2 commit 8b235bb
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,20 @@ public void setEndTime(@Nullable Long endTime) {
this.endTime = endTime;
}

/**
* Gets the start ttl timestamp for this execution. If the execution has not
* started before this timestamp, the execution will immediately terminate.
*/
private Long startTimeTtl;

public @Nullable Long getStartTimeTtl() {
return startTimeTtl;
}

public void setStartTimeTtl(@Nullable Long startTimeTtl) {
this.startTimeTtl = startTimeTtl;
}

private ExecutionStatus status = NOT_STARTED;

public @Nonnull ExecutionStatus getStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,20 @@ public void setEndTime(@Nullable Long endTime) {
this.endTime = endTime;
}

/**
* Gets the start ttl timestamp for this stage. If the stage has not started
* before this timestamp, the stage will fail.
*/
private Long startTimeTtl;

public @Nullable Long getStartTimeTtl() {
return startTimeTtl;
}

public void setStartTimeTtl(@Nullable Long startTimeTtl) {
this.startTimeTtl = startTimeTtl;
}

/**
* The execution status for this stage
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import com.netflix.spinnaker.orca.ExecutionStatus.*
import com.netflix.spinnaker.orca.events.ExecutionComplete
import com.netflix.spinnaker.orca.events.ExecutionStarted
import com.netflix.spinnaker.orca.ext.initialStages
import com.netflix.spinnaker.orca.pipeline.model.Execution
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.CancelExecution
import com.netflix.spinnaker.orca.q.StartExecution
import com.netflix.spinnaker.orca.q.StartStage
import com.netflix.spinnaker.q.Queue
Expand All @@ -30,12 +32,14 @@ import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Component
import java.time.Clock

@Component
class StartExecutionHandler(
override val queue: Queue,
override val repository: ExecutionRepository,
@Qualifier("queueEventPublisher") private val publisher: ApplicationEventPublisher
@Qualifier("queueEventPublisher") private val publisher: ApplicationEventPublisher,
private val clock: Clock
) : OrcaMessageHandler<StartExecution> {

override val messageType = StartExecution::class.java
Expand All @@ -46,6 +50,21 @@ class StartExecutionHandler(
override fun handle(message: StartExecution) {
message.withExecution { execution ->
if (execution.status == NOT_STARTED && !execution.isCanceled) {
if (execution.afterStartTimeTtl()) {
log.warn("Execution (type ${message.executionType}, id {}, application: {}) start was canceled because" +
"start time would be after defined start time TTL (now: ${clock.millis()}, ttl: ${execution.startTimeTtl})",
value("executionId", message.executionId),
value("application", message.application))
queue.push(CancelExecution(
message.executionType,
message.executionId,
message.application,
"spinnaker",
"Could not begin execution before start time TTL"
))
return@withExecution
}

val initialStages = execution.initialStages()
if (initialStages.isEmpty()) {
log.warn("No initial stages found (executionId: ${message.executionId})")
Expand All @@ -72,4 +91,7 @@ class StartExecutionHandler(
}
}
}

private fun Execution.afterStartTimeTtl() =
startTimeTtl?.let { clock.millis() > it } ?: false
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class StartStageHandler(
log.warn("Ignoring $message as stage is already ${stage.status}")
} else if (stage.shouldSkip()) {
queue.push(SkipStage(message))
} else if (stage.afterStartTimeTtl()) {
log.warn("Stage (${stage.id}) is being canceled because its start time is after TTL " +
"(executionId: ${message.executionId}")
queue.push(CancelStage(stage))
} else {
try {
stage.withAuth {
Expand Down Expand Up @@ -179,4 +183,7 @@ class StartStageHandler(

return OptionalStageSupport.isOptional(clonedStage.withMergedContext(), contextParameterProcessor)
}

private fun Stage.afterStartTimeTtl(): Boolean =
startTimeTtl?.let { clock.millis() > it } ?: false
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import com.netflix.spinnaker.orca.events.ExecutionStarted
import com.netflix.spinnaker.orca.fixture.pipeline
import com.netflix.spinnaker.orca.fixture.stage
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.CancelExecution
import com.netflix.spinnaker.orca.q.StartExecution
import com.netflix.spinnaker.orca.q.StartStage
import com.netflix.spinnaker.orca.q.singleTaskStage
import com.netflix.spinnaker.q.Queue
import com.netflix.spinnaker.time.fixedClock
import com.nhaarman.mockito_kotlin.*
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.dsl.context
Expand All @@ -40,9 +42,10 @@ object StartExecutionHandlerTest : SubjectSpek<StartExecutionHandler>({
val queue: Queue = mock()
val repository: ExecutionRepository = mock()
val publisher: ApplicationEventPublisher = mock()
val clock = fixedClock()

subject(GROUP) {
StartExecutionHandler(queue, repository, publisher)
StartExecutionHandler(queue, repository, publisher, clock)
}

fun resetMocks() = reset(queue, repository, publisher)
Expand Down Expand Up @@ -217,5 +220,35 @@ object StartExecutionHandlerTest : SubjectSpek<StartExecutionHandler>({
})
}
}

context("with a start time after ttl") {
val pipeline = pipeline {
stage {
type = singleTaskStage.type
}
startTimeTtl = clock.instant().minusSeconds(30).toEpochMilli()
}
val message = StartExecution(pipeline.type, pipeline.id, "foo")

beforeGroup {
whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline
}

afterGroup(::resetMocks)

action("the handler receives the message") {
subject.handle(message)
}

it("cancels the execution") {
verify(queue, times(1)).push(CancelExecution(
message.executionType,
message.executionId,
message.application,
"spinnaker",
"Could not begin execution before start time TTL"
))
}
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,34 @@ object StartStageHandlerTest : SubjectSpek<StartStageHandler>({
}
}

given("the stage has a start time after ttl") {
val pipeline = pipeline {
application = "foo"
stage {
refId = "bar"
type = singleTaskStage.type
startTimeTtl = clock.instant().minusSeconds(30).toEpochMilli()
}
}
val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id)

beforeGroup {
whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline
}

afterGroup(::resetMocks)

on("receiving a message") {
subject.handle(message)
}

it("cancels the stage") {
verify(queue).push(CancelStage(
pipeline.stageByRef("bar")
))
}
}

given("an exception is thrown planning the stage") {
val pipeline = pipeline {
application = "covfefe"
Expand Down

0 comments on commit 8b235bb

Please sign in to comment.