Skip to content

Commit

Permalink
fix(queue): prevent echo listener from overwriting data from task sta…
Browse files Browse the repository at this point in the history
…rting
  • Loading branch information
robfletcher committed May 11, 2017
1 parent 3abaa9f commit ba9d1bd
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ public Stage(T execution, String type) {
@JsonIgnore
private final AtomicInteger stageCounter = new AtomicInteger(0);

public Task taskById(String taskId) {
return tasks
.stream()
.filter(it -> it.getId().equals(taskId))
.findFirst()
.orElse(null);
}

/**
* Gets the last stage preceding this stage that has the specified type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public interface ExecutionRepository {

void storeStage(Stage<? extends Execution> stage);

void updateStageContext(Stage<? extends Execution> stage);

void removeStage(Execution execution, String stageId);

void addStage(Stage stage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@ class JedisExecutionRepository implements ExecutionRepository {
}
}

@Override
void updateStageContext(Stage<? extends Execution> stage) {
def execution = stage.execution
def type = execution.getClass().simpleName.toLowerCase()
def key = "${type}:${execution.id}"
withJedis(getJedisPoolForId(key)) { Jedis jedis ->
jedis.hset(key, "stage.${stage.id}.context", mapper.writeValueAsString(stage.context))
}
}

@Override
void removeStage(Execution execution, String stageId) {
Class<? extends Execution> executionType = execution.getClass()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class EchoConfiguration {
}

@Bean
EchoNotifyingStageListener echoNotifyingStageExecutionListener(EchoService echoService) {
new EchoNotifyingStageListener(echoService)
EchoNotifyingStageListener echoNotifyingStageExecutionListener(EchoService echoService, ExecutionRepository repository) {
new EchoNotifyingStageListener(echoService, repository)
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.netflix.spinnaker.orca.echo.EchoService
import com.netflix.spinnaker.orca.listeners.Persister
import com.netflix.spinnaker.orca.listeners.StageListener
import com.netflix.spinnaker.orca.pipeline.model.*
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand All @@ -21,10 +22,12 @@ import static java.lang.System.currentTimeMillis
class EchoNotifyingStageListener implements StageListener {

private final EchoService echoService
private final ExecutionRepository repository

@Autowired
EchoNotifyingStageListener(EchoService echoService) {
EchoNotifyingStageListener(EchoService echoService, ExecutionRepository repository) {
this.echoService = echoService
this.repository = repository
}

@Override
Expand All @@ -50,7 +53,7 @@ class EchoNotifyingStageListener implements StageListener {
isSynthetic: stage.syntheticStageOwner != null
]
stage.context.stageDetails = details
persister.save(stage)
repository.updateStageContext(stage)

log.debug("***** $stage.execution.id Echo stage $stage.name starting v2")
recordEvent("stage", "starting", stage)
Expand Down Expand Up @@ -78,7 +81,7 @@ class EchoNotifyingStageListener implements StageListener {
if (stage.endTime) {
stage.context.stageDetails.endTime = stage.endTime
}
persister.save(stage)
repository.updateStageContext(stage)

// STOPPED stages are "successful" because they allow the pipeline to
// proceed but they are still failures in terms of the stage and should
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ abstract class EchoEventIntegrationSpec<R extends ExecutionRunner> extends Speci
}

@Bean
EchoNotifyingStageListener echoNotifyingStageListener(EchoService echoService) {
new EchoNotifyingStageListener(echoService)
EchoNotifyingStageListener echoNotifyingStageListener(EchoService echoService, ExecutionRepository repository) {
new EchoNotifyingStageListener(echoService, repository)
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.netflix.spinnaker.orca.pipeline.model.Orchestration
import com.netflix.spinnaker.orca.pipeline.model.Pipeline
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.model.Task
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Subject
Expand All @@ -16,9 +17,10 @@ class EchoNotifyingStageListenerSpec extends Specification {

def echoService = Mock(EchoService)
def persister = Stub(Persister)
def repository = Mock(ExecutionRepository)

@Subject
def echoListener = new EchoNotifyingStageListener(echoService)
def echoListener = new EchoNotifyingStageListener(echoService, repository)

@Shared
def pipelineStage = new Stage<>(new Pipeline(), "test", "test", [:])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class RedisQueue(
private fun Jedis.pop(from: String, to: String, delay: TemporalAmount = ZERO) =
zrangeByScore(from, 0.0, score(), 0, 1)
.takeIf {
// TODO: this isn't right, `it` is a set (often an empty one)
getSet("$locksKey:$it", currentInstanceId) in listOf(null, currentInstanceId)
}
?.also {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ interface MessageHandler<M : Message> : (Message) -> Unit {
fun TaskLevel.withTask(block: (Stage<*>, Task) -> Unit) =
withStage { stage ->
stage
.getTasks()
.find { it.id == taskId }
.taskById(taskId)
.let { task ->
if (task == null) {
queue.push(InvalidTaskId(this))
Expand All @@ -56,8 +55,7 @@ interface MessageHandler<M : Message> : (Message) -> Unit {
fun StageLevel.withStage(block: (Stage<*>) -> Unit) =
withExecution { execution ->
execution
.getStages()
.find { it.getId() == stageId }
.stageById(stageId)
.let { stage ->
if (stage == null) {
queue.push(InvalidStageId(this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ open class StartTaskHandler
task.status = RUNNING
task.startTime = clock.millis()
repository.storeStage(stage)

queue.push(RunTask(message, task.id, task.type))

publisher.publishEvent(TaskStarted(this, stage, task))
Expand Down

0 comments on commit ba9d1bd

Please sign in to comment.