diff --git a/clouddriver-saga-test/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/AbstractSagaTest.kt b/clouddriver-saga-test/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/AbstractSagaTest.kt index 6b442ea88d6..72ec3a7b5f0 100644 --- a/clouddriver-saga-test/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/AbstractSagaTest.kt +++ b/clouddriver-saga-test/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/AbstractSagaTest.kt @@ -50,6 +50,7 @@ abstract class AbstractSagaTest : JUnit5Minutests { ShouldBranchPredicate::class.java ) } + registerBeans(applicationContext, *options.registerTypes.toTypedArray()) sagaService = SagaService(sagaRepository, NoopRegistry()).apply { setApplicationContext(applicationContext) @@ -60,10 +61,12 @@ abstract class AbstractSagaTest : JUnit5Minutests { /** * @param mockSaga Whether or not to use mockk for the [SagaRepository] or the [TestingSagaRepository] * @param registerDefaultTestTypes Whether or not to register the canned test types for "autowiring" + * @param registerTypes Types to register (additive if [registerDefaultTestTypes] is true) */ open inner class FixtureOptions( val mockSaga: Boolean = false, - val registerDefaultTestTypes: Boolean = true + val registerDefaultTestTypes: Boolean = true, + val registerTypes: List> = listOf() ) protected fun registerBeans(applicationContext: ApplicationContext, vararg clazz: Class<*>) { diff --git a/clouddriver-saga-test/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/types.kt b/clouddriver-saga-test/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/types.kt index 4edc33da699..eb9aea014fd 100644 --- a/clouddriver-saga-test/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/types.kt +++ b/clouddriver-saga-test/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/types.kt @@ -18,6 +18,8 @@ package com.netflix.spinnaker.clouddriver.saga import com.fasterxml.jackson.annotation.JsonTypeName import com.netflix.spinnaker.clouddriver.saga.flow.SagaAction import com.netflix.spinnaker.clouddriver.saga.models.Saga +import org.springframework.core.Ordered.HIGHEST_PRECEDENCE +import org.springframework.core.annotation.Order import java.util.function.Predicate @JsonTypeName("shouldBranch") @@ -47,18 +49,21 @@ class Action1 : SagaAction { } } +@Order(HIGHEST_PRECEDENCE) class Action2 : SagaAction { override fun apply(command: DoAction2, saga: Saga): SagaAction.Result { return SagaAction.Result(null, listOf()) } } +@Order(HIGHEST_PRECEDENCE) class Action3 : SagaAction { override fun apply(command: DoAction3, saga: Saga): SagaAction.Result { return SagaAction.Result(null, listOf()) } } +@Order(HIGHEST_PRECEDENCE) class ShouldBranchPredicate : Predicate { override fun test(t: Saga): Boolean = t.getEvents().filterIsInstance().isNotEmpty() diff --git a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/SagaService.kt b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/SagaService.kt index 13adeebef42..69cf87f0ea1 100644 --- a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/SagaService.kt +++ b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/SagaService.kt @@ -15,7 +15,6 @@ */ package com.netflix.spinnaker.clouddriver.saga -import com.fasterxml.jackson.annotation.JsonTypeName import com.google.common.annotations.Beta import com.netflix.spectator.api.Registry import com.netflix.spinnaker.clouddriver.saga.exceptions.SagaIntegrationException @@ -192,12 +191,6 @@ class SagaService( throw SagaSystemException("Resolved next action is not a SagaCommand: ${rawClass.simpleName}") } - /** - * TODO(rz): Do we want our own annotation instead of relying on [JsonTypeName]? - */ - private fun getStepCommandName(command: SagaCommand): String = - command.javaClass.getAnnotation(JsonTypeName::class.java)?.value ?: command.javaClass.simpleName - override fun setApplicationContext(applicationContext: ApplicationContext) { this.applicationContext = applicationContext } diff --git a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/events.kt b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/events.kt index ab9d3175895..32ea4300208 100644 --- a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/events.kt +++ b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/events.kt @@ -37,7 +37,7 @@ abstract class AbstractSagaEvent : AbstractSpinnakerEvent(), SagaEvent * This event does not attempt to find a difference in state, trading off persistence verbosity for a little bit * of a simpler implementation. * - * @param saga The [Saga]'s newly saved state + * @param sequence The [Saga]'s latest sequence */ @JsonTypeName("sagaSaved") class SagaSaved( diff --git a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/SagaFlowIterator.kt b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/SagaFlowIterator.kt index 5efd084404a..bd19e5206d6 100644 --- a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/SagaFlowIterator.kt +++ b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/SagaFlowIterator.kt @@ -16,8 +16,11 @@ package com.netflix.spinnaker.clouddriver.saga.flow import com.netflix.spinnaker.clouddriver.saga.SagaCommand +import com.netflix.spinnaker.clouddriver.saga.SagaCommandCompleted import com.netflix.spinnaker.clouddriver.saga.exceptions.SagaNotFoundException import com.netflix.spinnaker.clouddriver.saga.exceptions.SagaSystemException +import com.netflix.spinnaker.clouddriver.saga.flow.seekers.SagaCommandCompletedEventSeeker +import com.netflix.spinnaker.clouddriver.saga.flow.seekers.SagaCommandEventSeeker import com.netflix.spinnaker.clouddriver.saga.models.Saga import com.netflix.spinnaker.clouddriver.saga.persistence.SagaRepository import com.netflix.spinnaker.kork.exceptions.SystemException @@ -48,6 +51,7 @@ class SagaFlowIterator( private val context = Context(saga.name, saga.id) private var index: Int = 0 + private var seeked: Boolean = false // toList.toMutableList copies the list so while we mutate stuff, it's all internal private var steps = flow.steps.toList().toMutableList() @@ -59,10 +63,21 @@ class SagaFlowIterator( return false } + // The iterator needs the latest state of a saga to correctly determine in the next step to take. + // This is kind of handy, since we can pass this newly refreshed state straight to the iterator consumer so they + // don't need to concern themselves with that. latestSaga = sagaRepository.get(context.sagaName, context.sagaId) ?: throw SagaNotFoundException("Could not find Saga (${context.sagaName}/${context.sagaId} for flow traversal") + // To support resuming sagas, we want to seek to the next step that has not been processed, + // which may not be the first step + seekToNextStep(latestSaga) + val nextStep = steps[index] + + // If the next step is a condition, try to wire it up from the [ApplicationContext] and run it against the latest + // state. If the predicate is true, its nested [SagaFlow] will be injected into the current steps list, replacing + // the condition step's location. If the condition is false, then we just remove the step from the list. if (nextStep is SagaFlow.ConditionStep) { val predicate = try { applicationContext.getBean(nextStep.predicate) @@ -81,6 +96,29 @@ class SagaFlowIterator( return index < steps.size } + /** + * Seeks the iterator to the next step that needs to be (re)started, if the saga has already begun. + * + * Multiple strategies are used to locate the correct index to seek to. The highest index returned from the [Seeker] + * strategies will be used for seeking. + * + * TODO(rz): What if there is more than 1 of a particular command in a flow? :thinking_face: May need more metadata + * in the [SagaCommandCompleted] event passed along... + */ + private fun seekToNextStep(saga: Saga) { + if (seeked) { + // We only want to seek once + return + } + seeked = true + + index = listOf(SagaCommandCompletedEventSeeker(), SagaCommandEventSeeker()) + .mapNotNull { it.invoke(index, steps, saga)?.coerceAtLeast(0) } + .max() + ?: index + log.info("Seeking to step index $index") + } + override fun next(): IteratorState { val step = steps[index] if (step !is SagaFlow.ActionStep) { @@ -130,3 +168,11 @@ class SagaFlowIterator( val sagaId: String ) } + +/** + * Allows multiple strategies to be used to locate the correct starting point for the [SagaFlowIterator]. + * + * If a Seeker cannot determine an index, null should be returned. If multiple Seekers return an index, the + * highest value will be used. + */ +internal typealias Seeker = (currentIndex: Int, steps: List, saga: Saga) -> Int? diff --git a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/seekers/SagaCommandCompletedEventSeeker.kt b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/seekers/SagaCommandCompletedEventSeeker.kt new file mode 100644 index 00000000000..bca0524f1b5 --- /dev/null +++ b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/seekers/SagaCommandCompletedEventSeeker.kt @@ -0,0 +1,54 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.saga.flow.seekers + +import com.netflix.spinnaker.clouddriver.saga.SagaCommandCompleted +import com.netflix.spinnaker.clouddriver.saga.flow.SagaFlow +import com.netflix.spinnaker.clouddriver.saga.flow.Seeker +import com.netflix.spinnaker.clouddriver.saga.flow.convertActionStepToCommandName +import com.netflix.spinnaker.clouddriver.saga.models.Saga +import org.slf4j.LoggerFactory + +/** + * Seeks the [SagaFlowIterator] index to the next command following a [SagaCommandCompleted] event. + */ +internal class SagaCommandCompletedEventSeeker : Seeker { + + private val log by lazy { LoggerFactory.getLogger(javaClass) } + + override fun invoke(currentIndex: Int, steps: List, saga: Saga): Int? { + val completionEvents = saga.getEvents().filterIsInstance() + if (completionEvents.isEmpty()) { + // If there are no completion events, we don't need to seek at all. + return null + } + + val lastCompletedCommand = completionEvents.last().command + val step = steps + .filterIsInstance() + .find { convertActionStepToCommandName(it) == lastCompletedCommand } + + if (step == null) { + // Not the end of the world if this seeker doesn't find a correlated step, but it's definitely an error case + log.error("Could not find step associated with last completed command ($lastCompletedCommand)") + return null + } + + return (steps.indexOf(step) + 1).also { + log.debug("Suggesting to seek index to $it") + } + } +} diff --git a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/seekers/SagaCommandEventSeeker.kt b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/seekers/SagaCommandEventSeeker.kt new file mode 100644 index 00000000000..f85968cbe03 --- /dev/null +++ b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/seekers/SagaCommandEventSeeker.kt @@ -0,0 +1,53 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.saga.flow.seekers + +import com.netflix.spinnaker.clouddriver.saga.SagaCommand +import com.netflix.spinnaker.clouddriver.saga.flow.SagaFlow +import com.netflix.spinnaker.clouddriver.saga.flow.Seeker +import com.netflix.spinnaker.clouddriver.saga.flow.convertActionStepToCommandClass +import com.netflix.spinnaker.clouddriver.saga.models.Saga +import org.slf4j.LoggerFactory + +/** + * Seeks the [SagaFlowIterator] index to the next incomplete, but committed [SagaCommand]. + */ +internal class SagaCommandEventSeeker : Seeker { + + private val log by lazy { LoggerFactory.getLogger(javaClass) } + + override fun invoke(currentIndex: Int, steps: List, saga: Saga): Int? { + val commands = saga.getEvents().filterIsInstance() + if (commands.isEmpty()) { + // No commands, nothing to seek to + return null + } + + val lastCommand = commands.last() + val step = steps + .filterIsInstance() + .find { convertActionStepToCommandClass(it) == lastCommand } + + if (step == null) { + log.error("Could not find step associated with last incomplete command ($lastCommand)") + return null + } + + return (steps.indexOf(step) + 1).also { + log.debug("Suggesting to seek index to $it") + } + } +} diff --git a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/util.kt b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/util.kt new file mode 100644 index 00000000000..04c3636723b --- /dev/null +++ b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/util.kt @@ -0,0 +1,43 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.saga.flow + +import com.netflix.spinnaker.clouddriver.saga.SagaCommand +import com.netflix.spinnaker.clouddriver.saga.exceptions.SagaSystemException +import com.netflix.spinnaker.clouddriver.saga.getStepCommandName +import org.springframework.core.ResolvableType + +/** + * Derives a [SagaCommand] name from a [SagaFlow.ActionStep]. + */ +internal fun convertActionStepToCommandName(step: SagaFlow.ActionStep): String = + getStepCommandName(convertActionStepToCommandClass(step)) + +/** + * Derives a [SagaCommand] Class from a [SagaFlow.ActionStep]. + */ +internal fun convertActionStepToCommandClass(step: SagaFlow.ActionStep): Class { + val actionType = ResolvableType.forClass(step.action) + .also { it.resolve() } + + val commandType = actionType.interfaces + .find { SagaAction::class.java.isAssignableFrom(it.rawClass!!) } + ?.getGeneric(0) + ?: throw SagaSystemException("Could not resolve SagaCommand type from ActionStep: $step") + + @Suppress("UNCHECKED_CAST") + return commandType.rawClass as Class +} diff --git a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/util.kt b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/util.kt new file mode 100644 index 00000000000..69d2511873d --- /dev/null +++ b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/util.kt @@ -0,0 +1,34 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.clouddriver.saga + +import com.fasterxml.jackson.annotation.JsonTypeName + +/** + * Get the name of the provided [command] instance. + * + */ +internal fun getStepCommandName(command: SagaCommand): String = + getStepCommandName(command.javaClass) + +/** + * Get the name of the provided [commandClass]. + * + * TODO(rz): Do we want our own annotation instead of relying on [JsonTypeName]? + */ +internal fun getStepCommandName(commandClass: Class): String = + commandClass.getAnnotation(JsonTypeName::class.java)?.value ?: commandClass.simpleName diff --git a/clouddriver-saga/src/test/kotlin/com/netflix/spinnaker/clouddriver/saga/SagaServiceTest.kt b/clouddriver-saga/src/test/kotlin/com/netflix/spinnaker/clouddriver/saga/SagaServiceTest.kt index cc38b997059..a1c861fcc93 100644 --- a/clouddriver-saga/src/test/kotlin/com/netflix/spinnaker/clouddriver/saga/SagaServiceTest.kt +++ b/clouddriver-saga/src/test/kotlin/com/netflix/spinnaker/clouddriver/saga/SagaServiceTest.kt @@ -15,7 +15,9 @@ */ package com.netflix.spinnaker.clouddriver.saga +import com.netflix.spinnaker.clouddriver.saga.flow.SagaAction import com.netflix.spinnaker.clouddriver.saga.flow.SagaFlow +import com.netflix.spinnaker.clouddriver.saga.models.Saga import dev.minutest.rootContext import strikt.api.expectThat import strikt.assertions.contains @@ -52,5 +54,83 @@ class SagaServiceTest : AbstractSagaTest() { } } } + + context("re-entrance") { + fixture { + ReentranceFixture() + } + + mapOf( + "completed doAction1" to listOf( + DoAction1(), + SagaCommandCompleted("doAction1") + ), + "completed doAction1, doAction2 incomplete" to listOf( + DoAction1(), + SagaCommandCompleted("doAction1"), + DoAction2() + ) + ).forEach { (name, previousEvents) -> + test("a saga resumes where it left off: $name") { + val flow = SagaFlow() + .then(ReentrantAction1::class.java) + .then(ReentrantAction2::class.java) + .then(ReentrantAction3::class.java) + + // We've already done some of the work. + sagaRepository.save(saga, previousEvents) + + // Apply the saga "again" + sagaService.applyBlocking("test", "test", flow, DoAction1()) + + val saga = sagaRepository.get("test", "test") + expectThat(saga) + .describedAs(name) + .isNotNull() + .and { + get { getEvents() }.filterIsInstance().map { it.javaClass.simpleName }.containsExactly( + "DoAction1", + "DoAction2", + "DoAction3" + ) + get { getEvents() }.filterIsInstance().map { it.command }.containsExactly( + "doAction1", + "doAction2", + "doAction3" + ) + get { getEvents() }.map { it.javaClass }.contains(SagaCompleted::class.java) + } + } + } + } + } + + private inner class ReentranceFixture : BaseSagaFixture() { + init { + registerBeans( + applicationContext, + ReentrantAction1::class.java, + ReentrantAction2::class.java, + ReentrantAction3::class.java + ) + } + } +} + +private class ReentrantAction1 : SagaAction { + override fun apply(command: DoAction1, saga: Saga): SagaAction.Result { + return SagaAction.Result(DoAction2()) + } +} + +private class ReentrantAction2 : SagaAction { + override fun apply(command: DoAction2, saga: Saga): SagaAction.Result { + return SagaAction.Result(DoAction3()) + } +} + +private class ReentrantAction3 : SagaAction { + override fun apply(command: DoAction3, saga: Saga): SagaAction.Result { + return SagaAction.Result() } } diff --git a/clouddriver-saga/src/test/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/SagaFlowIteratorTest.kt b/clouddriver-saga/src/test/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/SagaFlowIteratorTest.kt index 3345898d7dd..00fbaf336a1 100644 --- a/clouddriver-saga/src/test/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/SagaFlowIteratorTest.kt +++ b/clouddriver-saga/src/test/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/SagaFlowIteratorTest.kt @@ -20,6 +20,7 @@ import com.netflix.spinnaker.clouddriver.saga.AbstractSagaTest import com.netflix.spinnaker.clouddriver.saga.Action1 import com.netflix.spinnaker.clouddriver.saga.Action2 import com.netflix.spinnaker.clouddriver.saga.Action3 +import com.netflix.spinnaker.clouddriver.saga.SagaCommandCompleted import com.netflix.spinnaker.clouddriver.saga.ShouldBranch import com.netflix.spinnaker.clouddriver.saga.ShouldBranchPredicate import dev.minutest.rootContext @@ -56,6 +57,18 @@ class SagaFlowIteratorTest : AbstractSagaTest() { that(subject.hasNext()).isFalse() } } + + test("seeks iterator with partially applied saga") { + saga.addEventForTest(SagaCommandCompleted("doAction1")) + saga.addEventForTest(ShouldBranch()) + + expect { + that(subject.hasNext()).isTrue() + that(subject.next()).get { action }.isA() + that(subject.next()).get { action }.isA() + that(subject.hasNext()).isFalse() + } + } } private inner class Fixture : BaseSagaFixture() { diff --git a/clouddriver-titus/src/main/groovy/com/netflix/spinnaker/clouddriver/titus/deploy/actions/LoadFront50App.java b/clouddriver-titus/src/main/groovy/com/netflix/spinnaker/clouddriver/titus/deploy/actions/LoadFront50App.java index 00f339f65f3..e60d2255105 100644 --- a/clouddriver-titus/src/main/groovy/com/netflix/spinnaker/clouddriver/titus/deploy/actions/LoadFront50App.java +++ b/clouddriver-titus/src/main/groovy/com/netflix/spinnaker/clouddriver/titus/deploy/actions/LoadFront50App.java @@ -124,11 +124,11 @@ public Result apply(@NotNull LoadFront50AppCommand command, @NotNull Saga saga) "Failed to convert front50 application to internal model", e); } } catch (Exception e) { - log.error("Failed to load front50 application attributes for {}", command.getAppName(), e); if (command.isAllowMissing()) { // It's ok to not load the front50 application return new Result(command.nextCommand, Collections.emptyList()); } + log.error("Failed to load front50 application attributes for {}", command.getAppName(), e); throw new SystemException( format("Failed to load front50 application: %s", command.getAppName()), e); }