Skip to content

Commit

Permalink
Revert "fix(tasks): Only lookup task by type in RunTaskHandler; remov…
Browse files Browse the repository at this point in the history
…e code introduced to support one-off case for now obsolete SimpleTask (#3939)" (#3941)

This reverts commit 6a16810.
  • Loading branch information
jonsie committed Oct 2, 2020
1 parent 72309b8 commit 59c995f
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 16 deletions.
Expand Up @@ -169,8 +169,23 @@ public GraphType getType() {
}
}

/**
* This is an abstraction above TaskDefinition that allows more flexibility for the implementing
* class name.
*/
interface DefinedTask {

/** Returns the name of the task */
@Nonnull
String getName();

/** Returns the name of the class implementing the stage */
@Nonnull
String getImplementingClassName();
}

/** An individual task. */
class TaskDefinition implements TaskNode {
class TaskDefinition implements TaskNode, DefinedTask {
private final String name;
private final Class<? extends Task> implementingClass;

Expand All @@ -179,12 +194,18 @@ public TaskDefinition(@Nonnull String name, @Nonnull Class<? extends Task> imple
this.implementingClass = implementingClass;
}

@Override
public @Nonnull String getName() {
return name;
}

public @Nonnull Class<? extends Task> getImplementingClass() {
return implementingClass;
}

@Override
public @Nonnull String getImplementingClassName() {
return getImplementingClass().getCanonicalName();
}
}
}
Expand Up @@ -86,8 +86,6 @@ private void computeTasks() {
/**
* Fetch a {@code Task} by {@code taskTypeIdentifier}.
*
* <p>If the task is not found from the type, attempts to re-compute tasks and lookup again.
*
* @param taskTypeIdentifier Task identifier (class name or alias)
* @return the Task matching {@code taskTypeIdentifier}
* @throws NoSuchTaskException if Task does not exist
Expand All @@ -112,7 +110,8 @@ public Task getTask(@Nonnull String taskTypeIdentifier) {
/**
* Fetch a {@code Task} by {@code Class type}.
*
* <p>If the task is not found from the type, attempts to re-compute tasks and lookup again.
* <p>This method is used as a fallback when looking up tasks, so if the task is not found from
* the type, attempts to re-compute tasks and lookup again.
*
* @param taskType Task type (class of task)
* @return the Task matching {@code taskType}
Expand Down
Expand Up @@ -20,7 +20,7 @@ import com.netflix.spinnaker.orca.api.pipeline.SyntheticStageOwner
import com.netflix.spinnaker.orca.api.pipeline.SyntheticStageOwner.STAGE_BEFORE
import com.netflix.spinnaker.orca.api.pipeline.graph.StageDefinitionBuilder
import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode
import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode.TaskDefinition
import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode.DefinedTask
import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode.TaskGraph
import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
Expand Down Expand Up @@ -52,11 +52,11 @@ private fun processTaskNode(
) {
element.apply {
when (value) {
is TaskDefinition -> {
is DefinedTask -> {
val task = TaskExecutionImpl()
task.id = (stage.tasks.size + 1).toString()
task.name = value.name
task.implementingClass = value.implementingClass.name
task.implementingClass = value.implementingClassName
if (isSubGraph) {
task.isLoopStart = isFirst
task.isLoopEnd = isLast
Expand Down
Expand Up @@ -232,10 +232,14 @@ class RunTaskHandler(
private fun RunTask.withTask(block: (StageExecution, TaskExecution, Task) -> Unit) =
withTask { stage, taskModel ->
try {
taskResolver.getTask(taskType)
taskResolver.getTask(taskModel.implementingClass)
} catch (e: TaskResolver.NoSuchTaskException) {
queue.push(InvalidTaskType(this, taskType.name))
null
try {
taskResolver.getTask(taskType)
} catch (e: TaskResolver.NoSuchTaskException) {
queue.push(InvalidTaskType(this, taskType.name))
null
}
}?.let {
block.invoke(stage, taskModel, it)
}
Expand Down
Expand Up @@ -43,6 +43,7 @@ import com.netflix.spinnaker.orca.pipeline.RestrictExecutionDuringTimeWindow
import com.netflix.spinnaker.orca.pipeline.model.DefaultTrigger
import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl.STAGE_TIMEOUT_OVERRIDE_KEY
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.pipeline.tasks.WaitTask
import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor
import com.netflix.spinnaker.orca.pipeline.util.StageNavigator
import com.netflix.spinnaker.orca.q.CompleteTask
Expand Down Expand Up @@ -271,6 +272,141 @@ object RunTaskHandlerTest : SubjectSpek<RunTaskHandler>({
}
}

describe("that completes successfully prefering specified TaskExecution implementingClass") {
val pipeline = pipeline {
stage {
type = "whatever"
task {
id = "1"
startTime = clock.instant().toEpochMilli()
implementingClass = task.javaClass.canonicalName
}
}
}
val stage = pipeline.stages.first()
val message = RunTask(pipeline.type, pipeline.id, "foo", stage.id, "1", WaitTask::class.java)

and("has no context updates outputs") {
val taskResult = TaskResult.SUCCEEDED

beforeGroup {
tasks.forEach { whenever(it.extensionClass) doReturn it::class.java }
taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage }
taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult }
whenever(task.execute(any())) doReturn taskResult
whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline
}

afterGroup(::resetMocks)

action("the handler receives a message") {
subject.handle(message)
}

it("executes the task") {
verify(task).execute(pipeline.stages.first())
}

it("completes the task") {
verify(queue).push(
check<CompleteTask> {
assertThat(it.status).isEqualTo(SUCCEEDED)
}
)
}

it("does not update the stage or global context") {
verify(repository, never()).storeStage(any())
}
}

and("has context updates") {
val stageOutputs = mapOf("foo" to "covfefe")
val taskResult = TaskResult.builder(SUCCEEDED).context(stageOutputs).build()

beforeGroup {
tasks.forEach { whenever(it.extensionClass) doReturn it::class.java }
taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage }
taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult }
whenever(task.execute(any())) doReturn taskResult
whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline
}

afterGroup(::resetMocks)

action("the handler receives a message") {
subject.handle(message)
}

it("updates the stage context") {
verify(repository).storeStage(
check {
assertThat(stageOutputs).isEqualTo(it.context)
}
)
}
}

and("has outputs") {
val outputs = mapOf("foo" to "covfefe")
val taskResult = TaskResult.builder(SUCCEEDED).outputs(outputs).build()

beforeGroup {
tasks.forEach { whenever(it.extensionClass) doReturn it::class.java }
taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage }
taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult }
whenever(task.execute(any())) doReturn taskResult
whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline
}

afterGroup(::resetMocks)

action("the handler receives a message") {
subject.handle(message)
}

it("updates the stage outputs") {
verify(repository).storeStage(
check {
assertThat(it.outputs).isEqualTo(outputs)
}
)
}
}

and("outputs a stageTimeoutMs value") {
val outputs = mapOf(
"foo" to "covfefe",
"stageTimeoutMs" to Long.MAX_VALUE
)
val taskResult = TaskResult.builder(SUCCEEDED).outputs(outputs).build()

beforeGroup {
tasks.forEach { whenever(it.extensionClass) doReturn it::class.java }
taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage }
taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult }
whenever(task.execute(any())) doReturn taskResult
whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline
}

afterGroup(::resetMocks)

action("the handler receives a message") {
subject.handle(message)
}

it("does not write stageTimeoutMs to outputs") {
verify(repository).storeStage(
check {
assertThat(it.outputs)
.containsKey("foo")
.doesNotContainKey("stageTimeoutMs")
}
)
}
}
}

describe("that is not yet complete") {
val pipeline = pipeline {
stage {
Expand Down Expand Up @@ -594,8 +730,8 @@ object RunTaskHandlerTest : SubjectSpek<RunTaskHandler>({
}

it("does not execute the task") {
verify(task).aliases()
verify(task).extensionClass
verify(task, times(2)).aliases()
verify(task, times(2)).extensionClass
verifyNoMoreInteractions(task)
}
}
Expand Down Expand Up @@ -678,8 +814,8 @@ object RunTaskHandlerTest : SubjectSpek<RunTaskHandler>({
}

it("does not execute the task") {
verify(task).aliases()
verify(task).extensionClass
verify(task, times(2)).aliases()
verify(task, times(2)).extensionClass
verifyNoMoreInteractions(task)
}
}
Expand Down Expand Up @@ -1602,8 +1738,8 @@ object RunTaskHandlerTest : SubjectSpek<RunTaskHandler>({
}

it("does not run any tasks") {
verify(task, times(2)).aliases()
verify(task, times(2)).extensionClass
verify(task, times(3)).aliases()
verify(task, times(3)).extensionClass
verifyNoMoreInteractions(task)
}

Expand Down

0 comments on commit 59c995f

Please sign in to comment.