Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdulRahman AlHamali committed Mar 22, 2021
1 parent fa7d423 commit 40ecffd
Show file tree
Hide file tree
Showing 11 changed files with 524 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,20 @@ public PipelineExecution restartStage(@Nonnull String executionId, @Nonnull Stri
return execution;
}

public PipelineExecution ignoreStageFailure(
@Nonnull String executionId, @Nonnull String stageId) {
PipelineExecution execution = repository.retrieve(ExecutionType.PIPELINE, executionId);
if (repository.handlesPartition(execution.getPartition())) {
runner.ignoreFailure(execution, stageId);
} else {
log.info(
"Not pushing queue message action='ignoreFailure' for execution with foreign partition='{}'",
execution.getPartition());
repository.ignoreStageFailure(executionId, stageId);
}
return execution;
}

private PipelineExecution doInternal(
Consumer<PipelineExecution> runnerAction,
Runnable repositoryAction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ default void restart(@Nonnull PipelineExecution execution, @Nonnull String stage
throw new UnsupportedOperationException();
}

default void ignoreFailure(@Nonnull PipelineExecution execution, @Nonnull String stageId) {
throw new UnsupportedOperationException();
}

default void reschedule(@Nonnull PipelineExecution execution) {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ default boolean handlesPartition(@Nullable String partitionOfExecution) {
// foreign executions
default void restartStage(String executionId, String stageId) {}

default void ignoreStageFailure(String executionId, String stageId) {}

final class ExecutionCriteria {
private int pageSize = 3500;
private Collection<ExecutionStatus> statuses = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.interlink.events;

import static com.netflix.spinnaker.orca.interlink.events.InterlinkEvent.EventType.IGNORE_FAILURE;

import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType;
import com.netflix.spinnaker.orca.pipeline.CompoundExecutionOperator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* This event is published on the interlink as a result of a user IGNORING THE FAILURE of a stage on
* an orca instance that can't handle the partition for the given execution.
*
* <p>The event is then handled by an orca instance (listening on interlink) whose partition matches
* that of the execution. The resulting repository mutations of this event will then be peered by
* the PeeringAgent
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class IgnoreStageFailureInterlinkEvent implements InterlinkEvent {
final EventType eventType = IGNORE_FAILURE;
@Nullable String partition;
@Nonnull ExecutionType executionType;
@Nonnull String executionId;
@Nonnull String stageId;

public IgnoreStageFailureInterlinkEvent(
@Nonnull ExecutionType executionType, @Nonnull String executionId, @Nonnull String stageId) {
// for the moment, only ExecutionType.PIPELINE can have ignored stages
// but since we are defining the protocol on the wire here, let's be a bit future proof and
// accept potentially different execution types
this.executionType = executionType;
this.executionId = executionId;
this.stageId = stageId;
}

@Override
public void applyTo(CompoundExecutionOperator executionOperator) {
executionOperator.ignoreStageFailure(executionId, stageId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
@JsonSubTypes.Type(value = ResumeInterlinkEvent.class, name = "RESUME"),
@JsonSubTypes.Type(value = DeleteInterlinkEvent.class, name = "DELETE"),
@JsonSubTypes.Type(value = PatchStageInterlinkEvent.class, name = "PATCH"),
@JsonSubTypes.Type(value = RestartStageInterlinkEvent.class, name = "RESTART")
@JsonSubTypes.Type(value = RestartStageInterlinkEvent.class, name = "RESTART"),
@JsonSubTypes.Type(value = IgnoreStageFailureInterlinkEvent.class, name = "IGNORE_FAILURE")
})
public interface InterlinkEvent {
enum EventType {
Expand All @@ -53,7 +54,8 @@ enum EventType {
DELETE,
RESUME,
PATCH,
RESTART
RESTART,
IGNORE_FAILURE
}

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class QueueExecutionRunner(
queue.push(RestartStage(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null)))
}

override fun ignoreFailure(execution: PipelineExecution, stageId: String) {
queue.push(IgnoreStageFailure(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null)))
}

override fun unpause(execution: PipelineExecution) {
queue.push(ResumeExecution(execution))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.q.handler

import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.*
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilderFactory
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.IgnoreStageFailure
import com.netflix.spinnaker.orca.q.pending.PendingExecutionService
import com.netflix.spinnaker.q.Queue
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.time.Clock

@Component
class IgnoreStageFailureHandler(
override val queue: Queue,
override val repository: ExecutionRepository,
override val stageDefinitionBuilderFactory: StageDefinitionBuilderFactory,
private val pendingExecutionService: PendingExecutionService,
private val clock: Clock
) : OrcaMessageHandler<IgnoreStageFailure>, StageBuilderAware {

override val messageType = IgnoreStageFailure::class.java

private val log: Logger get() = LoggerFactory.getLogger(javaClass)

override fun handle(message: IgnoreStageFailure) {
message.withStage { stage ->
if (!stage.status.isHalt) {
log.warn("Attempting to ignore the failure of stage $stage which is not halted. Will ignore")
} else if (stage.execution.shouldQueue()) {
// this pipeline is already running and has limitConcurrent = true
stage.execution.pipelineConfigId?.let {
log.info("Queueing IgnoreStageFailure of {} {} {}", stage.execution.application, stage.execution.name, stage.execution.id)
pendingExecutionService.enqueue(it, message)
}
} else {
stage.status = FAILED_CONTINUE
stage.addIgnoreFailureDetails(message.user)
repository.storeStage(stage)

val topLevelStage = stage.topLevelStage
if (topLevelStage != stage) {
topLevelStage.status = RUNNING
repository.storeStage(topLevelStage)
}

val execution = topLevelStage.execution
if (execution.status.isHalt) {
stage.execution.updateStatus(RUNNING)
repository.updateStatus(execution)
}

stage.startNext()
}
}
}

private fun StageExecution.addIgnoreFailureDetails(user: String?) {
context["ignoreFailureDetails"] = mapOf(
"failureIgnoredBy" to (user ?: "anonymous"),
"failureIgnoreTime" to clock.millis(),
"previousException" to context.remove("exception")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,21 @@ data class RestartStage(
this(stage.execution.type, stage.execution.id, stage.execution.application, stage.id, user)
}

@JsonTypeName("ignoreStageFailure")
data class IgnoreStageFailure(
override val executionType: ExecutionType,
override val executionId: String,
override val application: String,
override val stageId: String,
val user: String?
) : Message(), StageLevel {
constructor(source: PipelineExecution, stageId: String, user: String?) :
this(source.type, source.id, source.application, stageId, user)

constructor(stage: StageExecution, user: String?) :
this(stage.execution.type, stage.execution.id, stage.execution.application, stage.id, user)
}

@JsonTypeName("resumeStage")
data class ResumeStage(
override val executionType: ExecutionType,
Expand Down
Loading

0 comments on commit 40ecffd

Please sign in to comment.