Skip to content

Commit

Permalink
fix(spel): Revert "fix(spel): Operate on evaluated stage in handlers. (
Browse files Browse the repository at this point in the history
…#1972)" (#2019)

This reverts commit d9e35a5.
  • Loading branch information
jtk54 committed Feb 26, 2018
1 parent b8ee2ea commit 86c3398
Showing 1 changed file with 20 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,42 +72,41 @@ class StartStageHandler(

override fun handle(message: StartStage) {
message.withStage { stage ->
val mergedContextStage = stage.withMergedContext()
if (mergedContextStage.anyUpstreamStagesFailed()) {
if (stage.anyUpstreamStagesFailed()) {
// this only happens in restart scenarios
log.warn("Tried to start stage ${mergedContextStage.id} but something upstream had failed (executionId: ${message.executionId})")
log.warn("Tried to start stage ${stage.id} but something upstream had failed (executionId: ${message.executionId})")
queue.push(CompleteExecution(message))
} else if (mergedContextStage.allUpstreamStagesComplete()) {
if (mergedContextStage.status != NOT_STARTED) {
log.warn("Ignoring $message as stage is already ${mergedContextStage.status}")
} else if (mergedContextStage.shouldSkip()) {
} else if (stage.allUpstreamStagesComplete()) {
if (stage.status != NOT_STARTED) {
log.warn("Ignoring $message as stage is already ${stage.status}")
} else if (stage.shouldSkip()) {
queue.push(SkipStage(message))
} else {
try {
mergedContextStage.withAuth {
mergedContextStage.withMergedContext().plan()
stage.withAuth {
stage.plan()
}

mergedContextStage.status = RUNNING
mergedContextStage.startTime = clock.millis()
repository.storeStage(mergedContextStage)
stage.status = RUNNING
stage.startTime = clock.millis()
repository.storeStage(stage)

mergedContextStage.start()
stage.start()

publisher.publishEvent(StageStarted(this, mergedContextStage))
trackResult(mergedContextStage)
publisher.publishEvent(StageStarted(this, stage))
trackResult(stage)
} catch(e: Exception) {
val exceptionDetails = exceptionHandlers.shouldRetry(e, mergedContextStage.name)
val exceptionDetails = exceptionHandlers.shouldRetry(e, stage.name)
if (exceptionDetails?.shouldRetry == true) {
val attempts = message.getAttribute<AttemptsAttribute>()?.attempts ?: 0
log.warn("Error planning ${mergedContextStage.type} stage for ${message.executionType}[${message.executionId}] (attempts: $attempts)")
log.warn("Error planning ${stage.type} stage for ${message.executionType}[${message.executionId}] (attempts: $attempts)")

message.setAttribute(MaxAttemptsAttribute(40))
queue.push(message, retryDelay)
} else {
log.error("Error running ${mergedContextStage.type} stage for ${message.executionType}[${message.executionId}]", e)
mergedContextStage.context["exception"] = exceptionDetails
repository.storeStage(mergedContextStage)
log.error("Error running ${stage.type} stage for ${message.executionType}[${message.executionId}]", e)
stage.context["exception"] = exceptionDetails
repository.storeStage(stage)
queue.push(CompleteStage(message))
}
}
Expand Down Expand Up @@ -143,7 +142,7 @@ class StartStageHandler(
builder().let { builder ->
builder.buildTasks(this)
builder.buildSyntheticStages(this) { it: Stage ->
repository.addStage(it)
repository.addStage(it.withMergedContext())
}
}
}
Expand Down

0 comments on commit 86c3398

Please sign in to comment.