Skip to content

Commit

Permalink
feat(saga): Add support for re-entrance of sagas (#4019)
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert committed Sep 14, 2019
1 parent b03a28a commit 515528e
Show file tree
Hide file tree
Showing 12 changed files with 334 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ abstract class AbstractSagaTest : JUnit5Minutests {
ShouldBranchPredicate::class.java
)
}
registerBeans(applicationContext, *options.registerTypes.toTypedArray())

sagaService = SagaService(sagaRepository, NoopRegistry()).apply {
setApplicationContext(applicationContext)
Expand All @@ -60,10 +61,12 @@ abstract class AbstractSagaTest : JUnit5Minutests {
/**
* @param mockSaga Whether or not to use mockk for the [SagaRepository] or the [TestingSagaRepository]
* @param registerDefaultTestTypes Whether or not to register the canned test types for "autowiring"
* @param registerTypes Types to register (additive if [registerDefaultTestTypes] is true)
*/
open inner class FixtureOptions(
val mockSaga: Boolean = false,
val registerDefaultTestTypes: Boolean = true
val registerDefaultTestTypes: Boolean = true,
val registerTypes: List<Class<*>> = listOf()
)

protected fun registerBeans(applicationContext: ApplicationContext, vararg clazz: Class<*>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.netflix.spinnaker.clouddriver.saga
import com.fasterxml.jackson.annotation.JsonTypeName
import com.netflix.spinnaker.clouddriver.saga.flow.SagaAction
import com.netflix.spinnaker.clouddriver.saga.models.Saga
import org.springframework.core.Ordered.HIGHEST_PRECEDENCE
import org.springframework.core.annotation.Order
import java.util.function.Predicate

@JsonTypeName("shouldBranch")
Expand Down Expand Up @@ -47,18 +49,21 @@ class Action1 : SagaAction<DoAction1> {
}
}

@Order(HIGHEST_PRECEDENCE)
class Action2 : SagaAction<DoAction2> {
override fun apply(command: DoAction2, saga: Saga): SagaAction.Result {
return SagaAction.Result(null, listOf())
}
}

@Order(HIGHEST_PRECEDENCE)
class Action3 : SagaAction<DoAction3> {
override fun apply(command: DoAction3, saga: Saga): SagaAction.Result {
return SagaAction.Result(null, listOf())
}
}

@Order(HIGHEST_PRECEDENCE)
class ShouldBranchPredicate : Predicate<Saga> {
override fun test(t: Saga): Boolean =
t.getEvents().filterIsInstance<ShouldBranch>().isNotEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.netflix.spinnaker.clouddriver.saga

import com.fasterxml.jackson.annotation.JsonTypeName
import com.google.common.annotations.Beta
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.clouddriver.saga.exceptions.SagaIntegrationException
Expand Down Expand Up @@ -192,12 +191,6 @@ class SagaService(
throw SagaSystemException("Resolved next action is not a SagaCommand: ${rawClass.simpleName}")
}

/**
* TODO(rz): Do we want our own annotation instead of relying on [JsonTypeName]?
*/
private fun getStepCommandName(command: SagaCommand): String =
command.javaClass.getAnnotation(JsonTypeName::class.java)?.value ?: command.javaClass.simpleName

override fun setApplicationContext(applicationContext: ApplicationContext) {
this.applicationContext = applicationContext
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ abstract class AbstractSagaEvent : AbstractSpinnakerEvent(), SagaEvent
* This event does not attempt to find a difference in state, trading off persistence verbosity for a little bit
* of a simpler implementation.
*
* @param saga The [Saga]'s newly saved state
* @param sequence The [Saga]'s latest sequence
*/
@JsonTypeName("sagaSaved")
class SagaSaved(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
package com.netflix.spinnaker.clouddriver.saga.flow

import com.netflix.spinnaker.clouddriver.saga.SagaCommand
import com.netflix.spinnaker.clouddriver.saga.SagaCommandCompleted
import com.netflix.spinnaker.clouddriver.saga.exceptions.SagaNotFoundException
import com.netflix.spinnaker.clouddriver.saga.exceptions.SagaSystemException
import com.netflix.spinnaker.clouddriver.saga.flow.seekers.SagaCommandCompletedEventSeeker
import com.netflix.spinnaker.clouddriver.saga.flow.seekers.SagaCommandEventSeeker
import com.netflix.spinnaker.clouddriver.saga.models.Saga
import com.netflix.spinnaker.clouddriver.saga.persistence.SagaRepository
import com.netflix.spinnaker.kork.exceptions.SystemException
Expand Down Expand Up @@ -48,6 +51,7 @@ class SagaFlowIterator(
private val context = Context(saga.name, saga.id)

private var index: Int = 0
private var seeked: Boolean = false

// toList.toMutableList copies the list so while we mutate stuff, it's all internal
private var steps = flow.steps.toList().toMutableList()
Expand All @@ -59,10 +63,21 @@ class SagaFlowIterator(
return false
}

// The iterator needs the latest state of a saga to correctly determine in the next step to take.
// This is kind of handy, since we can pass this newly refreshed state straight to the iterator consumer so they
// don't need to concern themselves with that.
latestSaga = sagaRepository.get(context.sagaName, context.sagaId)
?: throw SagaNotFoundException("Could not find Saga (${context.sagaName}/${context.sagaId} for flow traversal")

// To support resuming sagas, we want to seek to the next step that has not been processed,
// which may not be the first step
seekToNextStep(latestSaga)

val nextStep = steps[index]

// If the next step is a condition, try to wire it up from the [ApplicationContext] and run it against the latest
// state. If the predicate is true, its nested [SagaFlow] will be injected into the current steps list, replacing
// the condition step's location. If the condition is false, then we just remove the step from the list.
if (nextStep is SagaFlow.ConditionStep) {
val predicate = try {
applicationContext.getBean(nextStep.predicate)
Expand All @@ -81,6 +96,29 @@ class SagaFlowIterator(
return index < steps.size
}

/**
* Seeks the iterator to the next step that needs to be (re)started, if the saga has already begun.
*
* Multiple strategies are used to locate the correct index to seek to. The highest index returned from the [Seeker]
* strategies will be used for seeking.
*
* TODO(rz): What if there is more than 1 of a particular command in a flow? :thinking_face: May need more metadata
* in the [SagaCommandCompleted] event passed along...
*/
private fun seekToNextStep(saga: Saga) {
if (seeked) {
// We only want to seek once
return
}
seeked = true

index = listOf(SagaCommandCompletedEventSeeker(), SagaCommandEventSeeker())
.mapNotNull { it.invoke(index, steps, saga)?.coerceAtLeast(0) }
.max()
?: index
log.info("Seeking to step index $index")
}

override fun next(): IteratorState {
val step = steps[index]
if (step !is SagaFlow.ActionStep) {
Expand Down Expand Up @@ -130,3 +168,11 @@ class SagaFlowIterator(
val sagaId: String
)
}

/**
* Allows multiple strategies to be used to locate the correct starting point for the [SagaFlowIterator].
*
* If a Seeker cannot determine an index, null should be returned. If multiple Seekers return an index, the
* highest value will be used.
*/
internal typealias Seeker = (currentIndex: Int, steps: List<SagaFlow.Step>, saga: Saga) -> Int?
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.saga.flow.seekers

import com.netflix.spinnaker.clouddriver.saga.SagaCommandCompleted
import com.netflix.spinnaker.clouddriver.saga.flow.SagaFlow
import com.netflix.spinnaker.clouddriver.saga.flow.Seeker
import com.netflix.spinnaker.clouddriver.saga.flow.convertActionStepToCommandName
import com.netflix.spinnaker.clouddriver.saga.models.Saga
import org.slf4j.LoggerFactory

/**
* Seeks the [SagaFlowIterator] index to the next command following a [SagaCommandCompleted] event.
*/
internal class SagaCommandCompletedEventSeeker : Seeker {

private val log by lazy { LoggerFactory.getLogger(javaClass) }

override fun invoke(currentIndex: Int, steps: List<SagaFlow.Step>, saga: Saga): Int? {
val completionEvents = saga.getEvents().filterIsInstance<SagaCommandCompleted>()
if (completionEvents.isEmpty()) {
// If there are no completion events, we don't need to seek at all.
return null
}

val lastCompletedCommand = completionEvents.last().command
val step = steps
.filterIsInstance<SagaFlow.ActionStep>()
.find { convertActionStepToCommandName(it) == lastCompletedCommand }

if (step == null) {
// Not the end of the world if this seeker doesn't find a correlated step, but it's definitely an error case
log.error("Could not find step associated with last completed command ($lastCompletedCommand)")
return null
}

return (steps.indexOf(step) + 1).also {
log.debug("Suggesting to seek index to $it")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.saga.flow.seekers

import com.netflix.spinnaker.clouddriver.saga.SagaCommand
import com.netflix.spinnaker.clouddriver.saga.flow.SagaFlow
import com.netflix.spinnaker.clouddriver.saga.flow.Seeker
import com.netflix.spinnaker.clouddriver.saga.flow.convertActionStepToCommandClass
import com.netflix.spinnaker.clouddriver.saga.models.Saga
import org.slf4j.LoggerFactory

/**
* Seeks the [SagaFlowIterator] index to the next incomplete, but committed [SagaCommand].
*/
internal class SagaCommandEventSeeker : Seeker {

private val log by lazy { LoggerFactory.getLogger(javaClass) }

override fun invoke(currentIndex: Int, steps: List<SagaFlow.Step>, saga: Saga): Int? {
val commands = saga.getEvents().filterIsInstance<SagaCommand>()
if (commands.isEmpty()) {
// No commands, nothing to seek to
return null
}

val lastCommand = commands.last()
val step = steps
.filterIsInstance<SagaFlow.ActionStep>()
.find { convertActionStepToCommandClass(it) == lastCommand }

if (step == null) {
log.error("Could not find step associated with last incomplete command ($lastCommand)")
return null
}

return (steps.indexOf(step) + 1).also {
log.debug("Suggesting to seek index to $it")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.saga.flow

import com.netflix.spinnaker.clouddriver.saga.SagaCommand
import com.netflix.spinnaker.clouddriver.saga.exceptions.SagaSystemException
import com.netflix.spinnaker.clouddriver.saga.getStepCommandName
import org.springframework.core.ResolvableType

/**
* Derives a [SagaCommand] name from a [SagaFlow.ActionStep].
*/
internal fun convertActionStepToCommandName(step: SagaFlow.ActionStep): String =
getStepCommandName(convertActionStepToCommandClass(step))

/**
* Derives a [SagaCommand] Class from a [SagaFlow.ActionStep].
*/
internal fun convertActionStepToCommandClass(step: SagaFlow.ActionStep): Class<SagaCommand> {
val actionType = ResolvableType.forClass(step.action)
.also { it.resolve() }

val commandType = actionType.interfaces
.find { SagaAction::class.java.isAssignableFrom(it.rawClass!!) }
?.getGeneric(0)
?: throw SagaSystemException("Could not resolve SagaCommand type from ActionStep: $step")

@Suppress("UNCHECKED_CAST")
return commandType.rawClass as Class<SagaCommand>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.clouddriver.saga

import com.fasterxml.jackson.annotation.JsonTypeName

/**
* Get the name of the provided [command] instance.
*
*/
internal fun getStepCommandName(command: SagaCommand): String =
getStepCommandName(command.javaClass)

/**
* Get the name of the provided [commandClass].
*
* TODO(rz): Do we want our own annotation instead of relying on [JsonTypeName]?
*/
internal fun getStepCommandName(commandClass: Class<SagaCommand>): String =
commandClass.getAnnotation(JsonTypeName::class.java)?.value ?: commandClass.simpleName
Loading

0 comments on commit 515528e

Please sign in to comment.