Skip to content

Commit

Permalink
fix(queue): don't just assume task ids are valid (#1307)
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed Apr 28, 2017
1 parent 8993de9 commit b0cbc58
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,6 @@ fun Stage<out Execution<*>>.nextTask(task: Task) =
getTasks()[index + 1]
}

/**
* @return the task with the specified id.
* @throws IllegalArgumentException if there is no such task.
*/
fun Stage<out Execution<*>>.task(taskId: String) =
getTasks().find { it.id == taskId } ?: throw IllegalArgumentException("No such task")

/**
* @return the stage with the specified [refId].
* @throws IllegalArgumentException if there is no such stage.
Expand Down
14 changes: 14 additions & 0 deletions orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/Message.kt
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,20 @@ data class InvalidStageId(
this(source.executionType, source.executionId, source.application, source.stageId)
}

/**
* Task id was not found in the stage.
*/
data class InvalidTaskId(
override val executionType: Class<out Execution<*>>,
override val executionId: String,
override val application: String,
override val stageId: String,
override val taskId: String
) : ConfigurationError(), TaskLevel {
constructor(source: TaskLevel) :
this(source.executionType, source.executionId, source.application, source.stageId, source.taskId)
}

/**
* No such [Task] class.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

package com.netflix.spinnaker.orca.q

import com.netflix.spinnaker.orca.pipeline.model.Execution
import com.netflix.spinnaker.orca.pipeline.model.Orchestration
import com.netflix.spinnaker.orca.pipeline.model.Pipeline
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.model.*
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository

Expand All @@ -44,6 +41,20 @@ interface MessageHandler<M : Message> : (Message) -> Unit {

fun handle(message: M): Unit

fun TaskLevel.withTask(block: (Stage<*>, Task) -> Unit) =
withStage { stage ->
stage
.getTasks()
.find { it.id == taskId }
.let { task ->
if (task == null) {
queue.push(InvalidTaskId(this))
} else {
block.invoke(stage, task)
}
}
}

fun StageLevel.withStage(block: (Stage<*>) -> Unit) =
withExecution { execution ->
execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ open class CompleteTaskHandler
) : MessageHandler<CompleteTask> {

override fun handle(message: CompleteTask) {
message.withStage { stage ->
val task = stage.task(message.taskId)
message.withTask { stage, task ->
task.status = message.status
task.endTime = clock.millis()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package com.netflix.spinnaker.orca.q.handler

import com.netflix.spinnaker.orca.ExecutionStatus.PAUSED
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.*
import com.netflix.spinnaker.orca.q.MessageHandler
import com.netflix.spinnaker.orca.q.PauseStage
import com.netflix.spinnaker.orca.q.PauseTask
import com.netflix.spinnaker.orca.q.Queue
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component

Expand All @@ -32,12 +35,10 @@ open class PauseTaskHandler
override val messageType = PauseTask::class.java

override fun handle(message: PauseTask) {
message.withStage { stage ->
stage.task(message.taskId).apply {
status = PAUSED
repository.storeStage(stage)
queue.push(PauseStage(message))
}
message.withTask { stage, task ->
task.status = PAUSED
repository.storeStage(stage)
queue.push(PauseStage(message))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ open class RunTaskHandler
private val log: Logger = getLogger(javaClass)

override fun handle(message: RunTask) {
message.withTask { stage, task ->
message.withTask { stage, taskModel, task ->
val execution = stage.getExecution()
if (execution.isCanceled() || execution.getStatus().complete) {
queue.push(CompleteTask(message, CANCELED))
} else if (execution.getStatus() == PAUSED) {
queue.push(PauseTask(message))
} else if (task.isTimedOut(stage, message)) {
} else if (task.isTimedOut(stage)) {
// TODO: probably want something specific in the execution log
queue.push(CompleteTask(message, TERMINAL))
} else {
Expand All @@ -82,7 +82,7 @@ open class RunTaskHandler
}
}
} catch(e: Exception) {
val exceptionDetails = shouldRetry(e, stage.task(message.taskId))
val exceptionDetails = shouldRetry(e, taskModel)
if (exceptionDetails?.shouldRetry ?: false) {
log.warn("Error running ${message.taskType.simpleName} for ${message.executionType.simpleName}[${message.executionId}]")
queue.push(message, task.backoffPeriod())
Expand Down Expand Up @@ -117,15 +117,15 @@ open class RunTaskHandler

override val messageType = RunTask::class.java

private fun RunTask.withTask(block: (Stage<*>, Task) -> Unit) =
withStage { stage ->
private fun RunTask.withTask(block: (Stage<*>, com.netflix.spinnaker.orca.pipeline.model.Task, Task) -> Unit) =
withTask { stage, taskModel ->
tasks
.find { taskType.isAssignableFrom(it.javaClass) }
.let { task ->
if (task == null) {
queue.push(InvalidTaskType(this, taskType.name))
} else {
block.invoke(stage, task)
block.invoke(stage, taskModel, task)
}
}
}
Expand All @@ -136,11 +136,10 @@ open class RunTaskHandler
else -> Duration.ofSeconds(1)
}

private fun Task.isTimedOut(stage: Stage<*>, message: RunTask): Boolean =
private fun Task.isTimedOut(stage: Stage<*>): Boolean =
when (this) {
is RetryableTask -> {
val taskModel = stage.task(message.taskId)
val startTime = Instant.ofEpochMilli(taskModel.startTime)
val startTime = Instant.ofEpochMilli(stage.getStartTime())
val pausedDuration = stage.getExecution().pausedDuration()
Duration
.between(startTime, clock.instant())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import com.netflix.spinnaker.orca.ExecutionStatus.RUNNING
import com.netflix.spinnaker.orca.Task
import com.netflix.spinnaker.orca.events.TaskStarted
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.*
import com.netflix.spinnaker.orca.q.MessageHandler
import com.netflix.spinnaker.orca.q.Queue
import com.netflix.spinnaker.orca.q.RunTask
import com.netflix.spinnaker.orca.q.StartTask
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Component
Expand All @@ -36,16 +39,14 @@ open class StartTaskHandler
) : MessageHandler<StartTask> {

override fun handle(message: StartTask) {
message.withStage { stage ->
val task = stage.task(message.taskId)
message.withTask { stage, task ->
task.status = RUNNING
task.startTime = clock.millis()
repository.storeStage(stage)

queue.push(RunTask(message, task.id, task.type))
}

publisher.publishEvent(TaskStarted(this, message.executionType, message.executionId, message.stageId, message.taskId))
publisher.publishEvent(TaskStarted(this, message.executionType, message.executionId, message.stageId, message.taskId))
}
}

override val messageType = StartTask::class.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ class RunTaskHandlerSpec : Spek({
val pipeline = pipeline {
stage {
type = "whatever"
startTime = clock.instant().toEpochMilli()
task {
id = "1"
implementingClass = DummyTask::class.qualifiedName
startTime = clock.instant().toEpochMilli()
}
}
}
Expand Down Expand Up @@ -88,10 +88,10 @@ class RunTaskHandlerSpec : Spek({
val pipeline = pipeline {
stage {
type = "whatever"
startTime = clock.instant().toEpochMilli()
task {
id = "1"
implementingClass = DummyTask::class.qualifiedName
startTime = clock.instant().toEpochMilli()
}
}
}
Expand Down Expand Up @@ -120,10 +120,10 @@ class RunTaskHandlerSpec : Spek({
val pipeline = pipeline {
stage {
type = "whatever"
startTime = clock.instant().toEpochMilli()
task {
id = "1"
implementingClass = DummyTask::class.qualifiedName
startTime = clock.instant().toEpochMilli()
}
}
}
Expand Down Expand Up @@ -198,10 +198,10 @@ class RunTaskHandlerSpec : Spek({
val pipeline = pipeline {
stage {
type = "whatever"
startTime = clock.instant().toEpochMilli()
task {
id = "1"
implementingClass = DummyTask::class.qualifiedName
startTime = clock.instant().toEpochMilli()
}
}
}
Expand Down Expand Up @@ -391,11 +391,11 @@ class RunTaskHandlerSpec : Spek({
val pipeline = pipeline {
stage {
type = "whatever"
startTime = clock.instant().minusMillis(timeout.toMillis() + 1).toEpochMilli()
task {
id = "1"
implementingClass = DummyTask::class.qualifiedName
status = RUNNING
startTime = clock.instant().minusMillis(timeout.toMillis() + 1).toEpochMilli()
}
}
}
Expand Down Expand Up @@ -430,11 +430,11 @@ class RunTaskHandlerSpec : Spek({
}
stage {
type = "whatever"
startTime = clock.instant().minusMillis(timeout.toMillis() + 1).toEpochMilli()
task {
id = "1"
implementingClass = DummyTask::class.qualifiedName
status = RUNNING
startTime = clock.instant().minusMillis(timeout.toMillis() + 1).toEpochMilli()
}
}
}
Expand Down Expand Up @@ -465,11 +465,11 @@ class RunTaskHandlerSpec : Spek({
}
stage {
type = "whatever"
startTime = clock.instant().minusMillis(timeout.plusMinutes(1).toMillis() + 1).toEpochMilli()
task {
id = "1"
implementingClass = DummyTask::class.qualifiedName
status = RUNNING
startTime = clock.instant().minusMillis(timeout.plusMinutes(1).toMillis() + 1).toEpochMilli()
}
}
}
Expand Down Expand Up @@ -502,12 +502,12 @@ class RunTaskHandlerSpec : Spek({
context["override"] = "global"
stage {
type = "whatever"
startTime = clock.instant().toEpochMilli()
context["stage"] = "foo"
context["override"] = "stage"
task {
id = "1"
implementingClass = DummyTask::class.qualifiedName
startTime = clock.instant().toEpochMilli()
}
}
}
Expand Down Expand Up @@ -541,6 +541,10 @@ class RunTaskHandlerSpec : Spek({
val pipeline = pipeline {
stage {
type = "whatever"
task {
id = "1"
implementingClass = InvalidTask::class.qualifiedName
}
}
}
val message = RunTask(Pipeline::class.java, pipeline.id, "foo", pipeline.stages.first().id, "1", InvalidTask::class.java)
Expand Down

0 comments on commit b0cbc58

Please sign in to comment.