Skip to content

Commit

Permalink
refactor(core): Change type to Instant (#2091)
Browse files Browse the repository at this point in the history
* refactor(core): Change type to Instant
  • Loading branch information
robzienert authored and robfletcher committed Mar 26, 2018
1 parent c500e56 commit 97947ec
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.time.Instant;
import java.util.*;

import static com.netflix.spinnaker.orca.ExecutionStatus.NOT_STARTED;
Expand Down Expand Up @@ -196,13 +197,14 @@ public void setEndTime(@Nullable Long endTime) {
* Gets the start ttl timestamp for this execution. If the execution has not
* started before this timestamp, the execution will immediately terminate.
*/
private Long startTimeTtl;
private Instant startTimeTtl;

public @Nullable Long getStartTimeTtl() {
public @Nullable
Instant getStartTimeTtl() {
return startTimeTtl;
}

public void setStartTimeTtl(@Nullable Long startTimeTtl) {
public void setStartTimeTtl(@Nullable Instant startTimeTtl) {
this.startTimeTtl = startTimeTtl;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.util.*;
import java.util.stream.Stream;

Expand Down Expand Up @@ -191,13 +192,14 @@ public void setEndTime(@Nullable Long endTime) {
* Gets the start ttl timestamp for this stage. If the stage has not started
* before this timestamp, the stage will fail.
*/
private Long startTimeTtl;
private Instant startTimeTtl;

public @Nullable Long getStartTimeTtl() {
public @Nullable
Instant getStartTimeTtl() {
return startTimeTtl;
}

public void setStartTimeTtl(@Nullable Long startTimeTtl) {
public void setStartTimeTtl(@Nullable Instant startTimeTtl) {
this.startTimeTtl = startTimeTtl;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class StartExecutionHandler(
override fun handle(message: StartExecution) {
message.withExecution { execution ->
if (execution.status == NOT_STARTED && !execution.isCanceled) {
if (execution.afterStartTimeTtl()) {
if (execution.isAfterStartTimeCutoff()) {
log.warn("Execution (type ${message.executionType}, id {}, application: {}) start was canceled because" +
"start time would be after defined start time TTL (now: ${clock.millis()}, ttl: ${execution.startTimeTtl})",
value("executionId", message.executionId),
Expand All @@ -62,23 +62,22 @@ class StartExecutionHandler(
"spinnaker",
"Could not begin execution before start time TTL"
))
return@withExecution
}
} else {
val initialStages = execution.initialStages()
if (initialStages.isEmpty()) {
log.warn("No initial stages found (executionId: ${message.executionId})")
repository.updateStatus(message.executionId, TERMINAL)
publisher.publishEvent(ExecutionComplete(this, message.executionType, message.executionId, TERMINAL))
return@withExecution
}

val initialStages = execution.initialStages()
if (initialStages.isEmpty()) {
log.warn("No initial stages found (executionId: ${message.executionId})")
repository.updateStatus(message.executionId, TERMINAL)
publisher.publishEvent(ExecutionComplete(this, message.executionType, message.executionId, TERMINAL))
return@withExecution
repository.updateStatus(message.executionId, RUNNING)
initialStages
.forEach {
queue.push(StartStage(message, it.id))
}
publisher.publishEvent(ExecutionStarted(this, message.executionType, message.executionId))
}

repository.updateStatus(message.executionId, RUNNING)
initialStages
.forEach {
queue.push(StartStage(message, it.id))
}
publisher.publishEvent(ExecutionStarted(this, message.executionType, message.executionId))
} else {
if (execution.status == CANCELED || execution.isCanceled) {
publisher.publishEvent(ExecutionComplete(this, message.executionType, message.executionId, execution.status))
Expand All @@ -92,6 +91,6 @@ class StartExecutionHandler(
}
}

private fun Execution.afterStartTimeTtl() =
startTimeTtl?.let { clock.millis() > it } ?: false
private fun Execution.isAfterStartTimeCutoff() =
startTimeTtl?.isBefore(clock.instant()) ?: false
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class StartStageHandler(
log.warn("Ignoring $message as stage is already ${stage.status}")
} else if (stage.shouldSkip()) {
queue.push(SkipStage(message))
} else if (stage.afterStartTimeTtl()) {
} else if (stage.isAfterStartTimeCutoff()) {
log.warn("Stage (${stage.id}) is being canceled because its start time is after TTL " +
"(executionId: ${message.executionId}")
queue.push(CancelStage(stage))
Expand Down Expand Up @@ -184,6 +184,6 @@ class StartStageHandler(
return OptionalStageSupport.isOptional(clonedStage.withMergedContext(), contextParameterProcessor)
}

private fun Stage.afterStartTimeTtl(): Boolean =
startTimeTtl?.let { clock.millis() > it } ?: false
private fun Stage.isAfterStartTimeCutoff(): Boolean =
startTimeTtl?.isBefore(clock.instant()) ?: false
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object StartExecutionHandlerTest : SubjectSpek<StartExecutionHandler>({
type = singleTaskStage.type
}
}
val message = StartExecution(pipeline.type, pipeline.id, "foo")
val message = StartExecution(pipeline)

beforeGroup {
whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline
Expand All @@ -74,12 +74,7 @@ object StartExecutionHandlerTest : SubjectSpek<StartExecutionHandler>({
}

it("starts the first stage") {
verify(queue).push(StartStage(
message.executionType,
message.executionId,
"foo",
pipeline.stages.first().id
))
verify(queue).push(StartStage(pipeline.stages.first()))
}

it("publishes an event") {
Expand All @@ -98,7 +93,7 @@ object StartExecutionHandlerTest : SubjectSpek<StartExecutionHandler>({
status = ExecutionStatus.CANCELED
}

val message = StartExecution(pipeline.type, pipeline.id, "foo")
val message = StartExecution(pipeline)

beforeGroup {
whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline
Expand Down Expand Up @@ -131,7 +126,7 @@ object StartExecutionHandlerTest : SubjectSpek<StartExecutionHandler>({
isCanceled = true
}

val message = StartExecution(pipeline.type, pipeline.id, "foo")
val message = StartExecution(pipeline)

beforeGroup {
whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline
Expand Down Expand Up @@ -165,7 +160,7 @@ object StartExecutionHandlerTest : SubjectSpek<StartExecutionHandler>({
type = singleTaskStage.type
}
}
val message = StartExecution(pipeline.type, pipeline.id, "foo")
val message = StartExecution(pipeline)

beforeGroup {
whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline
Expand Down Expand Up @@ -196,7 +191,7 @@ object StartExecutionHandlerTest : SubjectSpek<StartExecutionHandler>({
requisiteStageRefIds = listOf("1")
}
}
val message = StartExecution(pipeline.type, pipeline.id, "foo")
val message = StartExecution(pipeline)

beforeGroup {
whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline
Expand Down Expand Up @@ -226,9 +221,9 @@ object StartExecutionHandlerTest : SubjectSpek<StartExecutionHandler>({
stage {
type = singleTaskStage.type
}
startTimeTtl = clock.instant().minusSeconds(30).toEpochMilli()
startTimeTtl = clock.instant().minusSeconds(30)
}
val message = StartExecution(pipeline.type, pipeline.id, "foo")
val message = StartExecution(pipeline)

beforeGroup {
whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline
Expand All @@ -241,10 +236,8 @@ object StartExecutionHandlerTest : SubjectSpek<StartExecutionHandler>({
}

it("cancels the execution") {
verify(queue, times(1)).push(CancelExecution(
message.executionType,
message.executionId,
message.application,
verify(queue).push(CancelExecution(
pipeline,
"spinnaker",
"Could not begin execution before start time TTL"
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,15 +610,15 @@ object StartStageHandlerTest : SubjectSpek<StartStageHandler>({
stage {
refId = "bar"
type = singleTaskStage.type
startTimeTtl = clock.instant().minusSeconds(30).toEpochMilli()
startTimeTtl = clock.instant().minusSeconds(30)
}
}
val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id)

beforeGroup {
whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline
}

afterGroup(::resetMocks)

on("receiving a message") {
Expand Down

0 comments on commit 97947ec

Please sign in to comment.