Skip to content

Commit

Permalink
working
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdulRahman AlHamali committed Mar 25, 2021
1 parent 4b9d07b commit 4d05b2b
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ public PipelineExecution restartStage(@Nonnull String executionId, @Nonnull Stri
}

public PipelineExecution ignoreStageFailure(
@Nonnull String executionId, @Nonnull String stageId) {
@Nonnull String executionId, @Nonnull String stageId, String reason) {
PipelineExecution execution = repository.retrieve(ExecutionType.PIPELINE, executionId);
if (repository.handlesPartition(execution.getPartition())) {
runner.ignoreFailure(execution, stageId);
runner.ignoreFailure(execution, stageId, reason);
} else {
log.info(
"Not pushing queue message action='ignoreFailure' for execution with foreign partition='{}'",
execution.getPartition());
repository.ignoreStageFailure(executionId, stageId);
repository.ignoreStageFailure(executionId, stageId, reason);
}
return execution;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ default void restart(@Nonnull PipelineExecution execution, @Nonnull String stage
throw new UnsupportedOperationException();
}

default void ignoreFailure(@Nonnull PipelineExecution execution, @Nonnull String stageId) {
default void ignoreFailure(
@Nonnull PipelineExecution execution, @Nonnull String stageId, String reason) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ default boolean handlesPartition(@Nullable String partitionOfExecution) {
// foreign executions
default void restartStage(String executionId, String stageId) {}

default void ignoreStageFailure(String executionId, String stageId) {}
default void ignoreStageFailure(String executionId, String stageId, String reason) {}

final class ExecutionCriteria {
private int pageSize = 3500;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,24 @@ public class IgnoreStageFailureInterlinkEvent implements InterlinkEvent {
@Nonnull ExecutionType executionType;
@Nonnull String executionId;
@Nonnull String stageId;
String reason;

public IgnoreStageFailureInterlinkEvent(
@Nonnull ExecutionType executionType, @Nonnull String executionId, @Nonnull String stageId) {
@Nonnull ExecutionType executionType,
@Nonnull String executionId,
@Nonnull String stageId,
String reason) {
// for the moment, only ExecutionType.PIPELINE can have ignored stages
// but since we are defining the protocol on the wire here, let's be a bit future proof and
// accept potentially different execution types
this.executionType = executionType;
this.executionId = executionId;
this.stageId = stageId;
this.reason = reason;
}

@Override
public void applyTo(CompoundExecutionOperator executionOperator) {
executionOperator.ignoreStageFailure(executionId, stageId);
executionOperator.ignoreStageFailure(executionId, stageId, reason);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class QueueExecutionRunner(
queue.push(RestartStage(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null)))
}

override fun ignoreFailure(execution: PipelineExecution, stageId: String) {
queue.push(IgnoreStageFailure(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null)))
override fun ignoreFailure(execution: PipelineExecution, stageId: String, reason: String?) {
queue.push(IgnoreStageFailure(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null), reason))
}

override fun unpause(execution: PipelineExecution) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class IgnoreStageFailureHandler(
}
} else {
stage.status = FAILED_CONTINUE
stage.addIgnoreFailureDetails(message.user)
stage.addIgnoreFailureDetails(message.user, message.reason)
repository.storeStage(stage)

val topLevelStage = stage.topLevelStage
Expand All @@ -74,10 +74,11 @@ class IgnoreStageFailureHandler(
}
}

private fun StageExecution.addIgnoreFailureDetails(user: String?) {
private fun StageExecution.addIgnoreFailureDetails(user: String?, reason: String?) {
context["ignoreFailureDetails"] = mapOf(
"failureIgnoredBy" to (user ?: "anonymous"),
"failureIgnoreTime" to clock.millis(),
"by" to (user ?: "anonymous"),
"reason" to (reason ?: "unspecified"),
"time" to clock.millis(),
"previousException" to context.remove("exception")
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,14 @@ data class IgnoreStageFailure(
override val executionId: String,
override val application: String,
override val stageId: String,
val user: String?
val user: String?,
val reason: String?
) : Message(), StageLevel {
constructor(source: PipelineExecution, stageId: String, user: String?) :
this(source.type, source.id, source.application, stageId, user)
constructor(source: PipelineExecution, stageId: String, user: String?, reason: String?) :
this(source.type, source.id, source.application, stageId, user, reason)

constructor(stage: StageExecution, user: String?) :
this(stage.execution.type, stage.execution.id, stage.execution.application, stage.id, user)
constructor(stage: StageExecution, user: String?, reason: String?) :
this(stage.execution.type, stage.execution.id, stage.execution.application, stage.id, user, reason)
}

@JsonTypeName("resumeStage")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ object IgnoreStageFailureHandlerTest : SubjectSpek<IgnoreStageFailureHandler>({
status = NOT_STARTED
}
}
val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com")
val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com", null)

beforeGroup {
whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline
Expand Down Expand Up @@ -152,7 +152,7 @@ object IgnoreStageFailureHandlerTest : SubjectSpek<IgnoreStageFailureHandler>({
endTime = clock.instant().minus(59, MINUTES).toEpochMilli()
}
}
val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com")
val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com", null)

beforeGroup {
whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline
Expand Down Expand Up @@ -220,7 +220,7 @@ object IgnoreStageFailureHandlerTest : SubjectSpek<IgnoreStageFailureHandler>({
status = NOT_STARTED
}
}
val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com")
val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com", null)

beforeGroup {
whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline
Expand Down Expand Up @@ -285,7 +285,7 @@ object IgnoreStageFailureHandlerTest : SubjectSpek<IgnoreStageFailureHandler>({
startTime = clock.instant().minus(1, HOURS).toEpochMilli()
}
}
val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com")
val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com", null)

beforeGroup {
whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ class SqlExecutionRepository(
}
}

override fun ignoreStageFailure(executionId: String, stageId: String) {
doForeignAware(IgnoreStageFailureInterlinkEvent(PIPELINE, executionId, stageId)) {
override fun ignoreStageFailure(executionId: String, stageId: String, reason: String?) {
doForeignAware(IgnoreStageFailureInterlinkEvent(PIPELINE, executionId, stageId, reason)) {
_, _ -> log.debug("ignoreStageFailure is a no-op for local executions")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,13 +508,13 @@ class TaskController {
@PreAuthorize("hasPermission(this.getPipeline(#id)?.application, 'APPLICATION', 'EXECUTE')")
@RequestMapping(value = "/pipelines/{id}/stages/{stageId}/ignoreFailure", method = RequestMethod.PUT)
PipelineExecution ignoreFailureOfPipelineStage(
@PathVariable String id, @PathVariable String stageId) {
@PathVariable String id, @PathVariable String stageId, @RequestBody Map details) {
def pipeline = executionRepository.retrieve(PIPELINE, id)
def stage = pipeline.stageById(stageId)
if (!(boolean) stage.context.getCurrentOnly("allowIgnoreFailure", false)) {
throw new CannotUpdateExecutionStage("Stage does not allow ignoreFailure action")
}
return executionOperator.ignoreStageFailure(id, stageId)
return executionOperator.ignoreStageFailure(id, stageId, details["reason"])
}

@PreAuthorize("hasPermission(this.getPipeline(#id)?.application, 'APPLICATION', 'READ')")
Expand Down

0 comments on commit 4d05b2b

Please sign in to comment.