Skip to content

Commit

Permalink
fix(task): FindArtifactFromExecutionTask no longer matches its own ar…
Browse files Browse the repository at this point in the history
…tifacts (#2992)
  • Loading branch information
Pere authored and ezimanyi committed Jun 24, 2019
1 parent 5131dc8 commit 9be0a35
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ COPY . workdir/

WORKDIR workdir

RUN GRADLE_USER_HOME=cache ./gradlew -I gradle/init-publish.gradle buildDeb -x test && \
RUN GRADLE_USER_HOME=cache ./gradlew -PenablePublishing=true buildDeb -x test && \
dpkg -i ./orca-web/build/distributions/*.deb && \
cd .. && \
rm -rf workdir
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
Expand All @@ -49,8 +50,19 @@ public TaskResult execute(@Nonnull Stage stage) {
FindArtifactFromExecutionContext.ExecutionOptions executionOptions =
context.getExecutionOptions();

List<Artifact> priorArtifacts =
artifactResolver.getArtifactsForPipelineId(pipeline, executionOptions.toCriteria());
List<Artifact> priorArtifacts;
// never resolve artifacts from the same stage in a prior execution
// we will get the set of the artifacts and remove them from the collection
String pipelineConfigId =
Optional.ofNullable(stage.getExecution().getPipelineConfigId()).orElse("");
if (pipelineConfigId.equals(pipeline)) {
priorArtifacts =
artifactResolver.getArtifactsForPipelineIdWithoutStageRef(
pipeline, stage.getRefId(), executionOptions.toCriteria());
} else {
priorArtifacts =
artifactResolver.getArtifactsForPipelineId(pipeline, executionOptions.toCriteria());
}

Set<Artifact> matchingArtifacts =
artifactResolver.resolveExpectedArtifacts(expectedArtifacts, priorArtifacts, null, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -97,11 +98,19 @@ public ArtifactResolver(
}

public @Nonnull List<Artifact> getAllArtifacts(@Nonnull Execution execution) {
return getAllArtifacts(execution, true, Optional.empty());
}

public @Nonnull List<Artifact> getAllArtifacts(
@Nonnull Execution execution,
Boolean includeTrigger,
Optional<Predicate<Stage>> stageFilter) {
// Get all artifacts emitted by the execution's stages; we'll sort the stages topologically,
// then reverse the result so that artifacts from later stages will appear
// earlier in the results.
List<Artifact> emittedArtifacts =
Stage.topologicalSort(execution.getStages())
.filter(stageFilter.orElse(s -> true))
.filter(s -> s.getOutputs().containsKey("artifacts"))
.flatMap(
s ->
Expand All @@ -118,11 +127,13 @@ public ArtifactResolver(
// Get all artifacts in the parent pipeline's trigger; these artifacts go at the end of the
// list,
// after any that were emitted by the pipeline
List<Artifact> triggerArtifacts =
objectMapper.convertValue(
execution.getTrigger().getArtifacts(), new TypeReference<List<Artifact>>() {});
if (includeTrigger) {
List<Artifact> triggerArtifacts =
objectMapper.convertValue(
execution.getTrigger().getArtifacts(), new TypeReference<List<Artifact>>() {});

emittedArtifacts.addAll(triggerArtifacts);
emittedArtifacts.addAll(triggerArtifacts);
}

return emittedArtifacts;
}
Expand Down Expand Up @@ -199,16 +210,23 @@ public ArtifactResolver(

public @Nonnull List<Artifact> getArtifactsForPipelineId(
@Nonnull String pipelineId, @Nonnull ExecutionCriteria criteria) {
Execution execution =
executionRepository.retrievePipelinesForPipelineConfigId(pipelineId, criteria)
.subscribeOn(Schedulers.io()).toSortedList(startTimeOrId).toBlocking().single().stream()
.findFirst()
.orElse(null);
Execution execution = getExecutionForPipelineId(pipelineId, criteria);

return execution == null ? Collections.emptyList() : getAllArtifacts(execution);
}

public void resolveArtifacts(@Nonnull Map<String, Object> pipeline) {
public @Nonnull List<Artifact> getArtifactsForPipelineIdWithoutStageRef(
@Nonnull String pipelineId, @Nonnull String stageRef, @Nonnull ExecutionCriteria criteria) {
Execution execution = getExecutionForPipelineId(pipelineId, criteria);

if (execution == null) {
return Collections.emptyList();
}

return getAllArtifacts(execution, true, Optional.of(it -> !stageRef.equals(it.getRefId())));
}

public void resolveArtifacts(@Nonnull Map pipeline) {
Map<String, Object> trigger = (Map<String, Object>) pipeline.get("trigger");
List<ExpectedArtifact> expectedArtifacts =
Optional.ofNullable((List<?>) pipeline.get("expectedArtifacts"))
Expand Down Expand Up @@ -367,6 +385,14 @@ public LinkedHashSet<Artifact> resolveExpectedArtifacts(
return resolvedArtifacts;
}

private Execution getExecutionForPipelineId(
@Nonnull String pipelineId, @Nonnull ExecutionCriteria criteria) {
return executionRepository.retrievePipelinesForPipelineConfigId(pipelineId, criteria)
.subscribeOn(Schedulers.io()).toSortedList(startTimeOrId).toBlocking().single().stream()
.findFirst()
.orElse(null);
}

private static class ArtifactResolutionException extends RuntimeException {
ArtifactResolutionException(String message, Throwable cause) {
super(message, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spinnaker.kork.artifacts.model.Artifact
import com.netflix.spinnaker.kork.artifacts.model.ExpectedArtifact
import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.pipeline.model.DefaultTrigger
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import rx.Observable
Expand All @@ -43,13 +44,17 @@ class ArtifactResolverSpec extends Specification {

def executionRepository = Stub(ExecutionRepository) {
// only a call to retrievePipelinesForPipelineConfigId() with these argument values is expected
retrievePipelinesForPipelineConfigId(pipelineId, expectedExecutionCriteria) >> Observable.empty();
retrievePipelinesForPipelineConfigId(pipelineId, expectedExecutionCriteria) >> Observable.empty()
// any other interaction is unexpected
0 * _
}

def makeArtifactResolver() {
return new ArtifactResolver(new ObjectMapper(), executionRepository,
return makeArtifactResolverWithStub(executionRepository)
}

def makeArtifactResolverWithStub(ExecutionRepository executionRepositoryStub) {
return new ArtifactResolver(new ObjectMapper(), executionRepositoryStub,
new ContextParameterProcessor())
}

Expand Down Expand Up @@ -309,6 +314,86 @@ class ArtifactResolverSpec extends Specification {
artifacts*.type == ["2", "1", "trigger"]
}

def "should find artifacts from a specific pipeline"() {
when:
def execution = pipeline {
id: pipelineId
status: ExecutionStatus.SUCCEEDED
stage {
refId = "1"
outputs.artifacts = [new Artifact(type: "1")]
}
stage {
refId = "2"
requisiteStageRefIds = ["1"]
outputs.artifacts = [new Artifact(type: "2")]
}
stage {
// This stage does not emit an artifacts
requisiteStageRefIds = ["2"]
}
}
execution.trigger = new DefaultTrigger("webhook", null, "user", [:], [new Artifact(type: "trigger")])

def executionCriteria = new ExecutionRepository.ExecutionCriteria()
executionCriteria.setStatuses(ExecutionStatus.SUCCEEDED)

def executionTerminalCriteria = new ExecutionRepository.ExecutionCriteria()
executionTerminalCriteria.setStatuses(ExecutionStatus.TERMINAL)

def executionRepositoryStub = Stub(ExecutionRepository) {
// only a call to retrievePipelinesForPipelineConfigId() with these argument values is expected
retrievePipelinesForPipelineConfigId(pipelineId, executionCriteria) >> Observable.just(execution)
retrievePipelinesForPipelineConfigId(pipelineId, executionTerminalCriteria) >> Observable.empty()
// any other interaction is unexpected
0 * _
}

def artifactResolver = makeArtifactResolverWithStub(executionRepositoryStub)

then:
def artifacts = artifactResolver.getArtifactsForPipelineId(pipelineId, executionCriteria)
artifacts.size == 3
artifacts*.type == ["2", "1", "trigger"]

def emptyArtifacts = artifactResolver.getArtifactsForPipelineId(pipelineId, executionTerminalCriteria)
emptyArtifacts == []
}

def "should find artifacts without a specific stage ref"() {
when:
def execution = pipeline {
id: pipelineId
stage {
refId = "1"
outputs.artifacts = [new Artifact(type: "1")]
}
stage {
refId = "2"
requisiteStageRefIds = ["1"]
outputs.artifacts = [new Artifact(type: "2")]
}
stage {
// This stage does not emit an artifacts
requisiteStageRefIds = ["2"]
}
}
execution.trigger = new DefaultTrigger("webhook", null, "user", [:], [new Artifact(type: "trigger")])

def executionRepositoryStub = Stub(ExecutionRepository) {
// only a call to retrievePipelinesForPipelineConfigId() with these argument values is expected
retrievePipelinesForPipelineConfigId(pipelineId, expectedExecutionCriteria) >> Observable.just(execution)
// any other interaction is unexpected
0 * _
}

def artifactResolver = makeArtifactResolverWithStub(executionRepositoryStub)

then:
def artifacts = artifactResolver.getArtifactsForPipelineIdWithoutStageRef(pipelineId, "2", expectedExecutionCriteria)
artifacts.size == 2
artifacts*.type == ["1", "trigger"]
}
@Unroll
def "should resolve expected artifacts from pipeline for #expectedArtifacts using #available and prior #prior"() {
when:
Expand Down

0 comments on commit 9be0a35

Please sign in to comment.