Skip to content

Commit

Permalink
fix(titus): Additional serialization issues from SQL backend (#4015)
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert committed Sep 10, 2019
1 parent 59f69b6 commit 9d4d5cd
Show file tree
Hide file tree
Showing 18 changed files with 143 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ package com.netflix.spinnaker.clouddriver.event
import com.fasterxml.jackson.annotation.JsonIgnore
import com.netflix.spinnaker.clouddriver.event.exceptions.UninitializedEventException

/**
* WARNING: Do not use this base class with Lombok events, you will have a bad time! Only use in Kotlin classes.
* For some reason, Lombok / Jackson can't find methods to deserialize, so the Java classes have to implement the
* interface directly. I'm not sure if this is a result of writing in Kotlin, or an issue in Lombok and/or Jackson.
*/
abstract class AbstractSpinnakerEvent : SpinnakerEvent {
/**
* Not a lateinit to make Java/Lombok & Jackson compatibility a little easier, although behavior is exactly the same.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.netflix.spinnaker.clouddriver.event

import com.fasterxml.jackson.annotation.JsonGetter
import com.fasterxml.jackson.annotation.JsonSetter
import com.fasterxml.jackson.annotation.JsonTypeInfo

/**
Expand All @@ -26,7 +28,9 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo
property = "eventType"
)
interface SpinnakerEvent {
@JsonGetter
fun getMetadata(): EventMetadata

@JsonSetter
fun setMetadata(eventMetadata: EventMetadata)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ import com.netflix.spinnaker.clouddriver.saga.models.Saga
import java.util.function.Predicate

@JsonTypeName("shouldBranch")
class ShouldBranch : SagaEvent()
class ShouldBranch : AbstractSagaEvent()

@JsonTypeName("doAction1")
class DoAction1(
val branch: Boolean = true
) : SagaCommand()
) : AbstractSagaEvent(), SagaCommand

@JsonTypeName("doAction2")
class DoAction2 : SagaCommand()
class DoAction2 : AbstractSagaEvent(), SagaCommand

@JsonTypeName("doAction3")
class DoAction3 : SagaCommand()
class DoAction3 : AbstractSagaEvent(), SagaCommand

class Action1 : SagaAction<DoAction1> {
override fun apply(command: DoAction1, saga: Saga): SagaAction.Result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,21 @@
*/
package com.netflix.spinnaker.clouddriver.saga

import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonTypeName
import com.netflix.spinnaker.clouddriver.event.AbstractSpinnakerEvent
import com.netflix.spinnaker.clouddriver.event.SpinnakerEvent
import com.netflix.spinnaker.clouddriver.saga.models.Saga
import com.netflix.spinnaker.kork.exceptions.SpinnakerException

/**
* Root event type for [Saga]s.
*
* @property sagaName Alias for [metadata.aggregateType]
* @property sagaId Alias for [metadata.aggregateId]
*/
abstract class SagaEvent : AbstractSpinnakerEvent() {
val sagaName
@JsonIgnore get() = getMetadata().aggregateType
interface SagaEvent : SpinnakerEvent

val sagaId
@JsonIgnore get() = getMetadata().aggregateId
}
/**
* Warning: Do not use with Lombok @Value classes.
*/
abstract class AbstractSagaEvent : AbstractSpinnakerEvent(), SagaEvent

/**
* Emitted whenever a [Saga] is saved.
Expand All @@ -46,7 +42,7 @@ abstract class SagaEvent : AbstractSpinnakerEvent() {
@JsonTypeName("sagaSaved")
class SagaSaved(
val sequence: Long
) : SagaEvent()
) : AbstractSagaEvent()

/**
* Emitted whenever an internal error has occurred while applying a [Saga].
Expand All @@ -62,7 +58,7 @@ class SagaInternalErrorOccurred(
val error: Exception? = null,
val retryable: Boolean = true,
val data: Map<String, String> = mapOf()
) : SagaEvent()
) : AbstractSagaEvent()

/**
* Emitted whenever an error has occurred within a [SagaAction] while applying a [Saga].
Expand All @@ -76,7 +72,7 @@ class SagaActionErrorOccurred(
val actionName: String,
val error: Exception,
val retryable: Boolean
) : SagaEvent()
) : AbstractSagaEvent()

/**
* Informational log that can be added to a [Saga] for end-user feedback, as well as operational insight.
Expand All @@ -89,7 +85,7 @@ class SagaActionErrorOccurred(
class SagaLogAppended(
val message: Message,
val diagnostics: Diagnostics? = null
) : SagaEvent() {
) : AbstractSagaEvent() {

/**
* @param user An end-user friendly message
Expand All @@ -116,29 +112,29 @@ class SagaLogAppended(
@JsonTypeName("sagaCompleted")
class SagaCompleted(
val success: Boolean
) : SagaEvent()
) : AbstractSagaEvent()

/**
* Emitted when a [Saga] enters a rollback state.
*/
@JsonTypeName("sagaRollbackStarted")
class SagaRollbackStarted : SagaEvent()
class SagaRollbackStarted : AbstractSagaEvent()

/**
* Emitted when all rollback actions for a [Saga] have been applied.
*/
@JsonTypeName("sagaRollbackCompleted")
class SagaRollbackCompleted : SagaEvent()
class SagaRollbackCompleted : AbstractSagaEvent()

/**
* The root event type for all mutating [Saga] operations.
*/
abstract class SagaCommand : SagaEvent()
interface SagaCommand : SagaEvent

/**
* The root event type for all [Saga] rollback operations.
*/
abstract class SagaRollbackCommand : SagaCommand()
interface SagaRollbackCommand : SagaCommand

/**
* Marker event for recording that the work associated with a particular [SagaCommand] event has been completed.
Expand All @@ -148,7 +144,7 @@ abstract class SagaRollbackCommand : SagaCommand()
@JsonTypeName("sagaCommandCompleted")
class SagaCommandCompleted(
val command: String
) : SagaEvent() {
) : AbstractSagaEvent() {

fun matches(candidateCommand: Class<out SagaCommand>): Boolean =
candidateCommand.getAnnotation(JsonTypeName::class.java)?.value == command
Expand All @@ -164,6 +160,6 @@ class SagaCommandCompleted(
class ManyCommands(
command1: SagaCommand,
vararg extraCommands: SagaCommand
) : SagaCommand() {
) : AbstractSagaEvent(), SagaCommand {
val commands = listOf(command1).plus(extraCommands)
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class DefaultSagaRepository(
.last()
.let {
Saga(
name = it.sagaName,
id = it.sagaId,
name = it.getMetadata().aggregateType,
id = it.getMetadata().aggregateId,
sequence = it.sequence
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.netflix.spinnaker.clouddriver.saga.examples

import com.fasterxml.jackson.annotation.JsonTypeName
import com.netflix.spinnaker.clouddriver.saga.AbstractSagaEvent
import com.netflix.spinnaker.clouddriver.saga.AbstractSagaTest
import com.netflix.spinnaker.clouddriver.saga.ManyCommands
import com.netflix.spinnaker.clouddriver.saga.SagaCommand
Expand Down Expand Up @@ -75,16 +76,16 @@ class BranchingExampleTest : AbstractSagaTest() {
}

@JsonTypeName("prepareForThings")
class PrepareForThings(val doOptionalThings: Boolean) : SagaCommand()
class PrepareForThings(val doOptionalThings: Boolean) : AbstractSagaEvent(), SagaCommand

@JsonTypeName("doTheThing")
class DoTheThing : SagaCommand()
class DoTheThing : AbstractSagaEvent(), SagaCommand

@JsonTypeName("doAnOptionalThing")
class DoAnOptionalThing : SagaCommand()
class DoAnOptionalThing : AbstractSagaEvent(), SagaCommand

@JsonTypeName("finishThings")
class FinishThings : SagaCommand()
class FinishThings : AbstractSagaEvent(), SagaCommand

class PrepareAction : SagaAction<PrepareForThings> {
override fun apply(command: PrepareForThings, saga: Saga): SagaAction.Result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.jooq.Condition
import org.jooq.DSLContext
import org.jooq.impl.DSL.currentTimestamp
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.max
import org.jooq.impl.DSL.table
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationEventPublisher
Expand Down Expand Up @@ -113,9 +114,8 @@ class SqlEventRepository(

// Events have their own auto-incrementing sequence within an aggregate; so we need to get the last sequence
// and generate from there.
val lastSequence = ctx.select(field("sequence")).from(EVENTS_TABLE)
val lastSequence = ctx.select(max(field("sequence"))).from(EVENTS_TABLE)
.where(aggregateCondition)
.orderBy(field("timestamp").desc())
.limit(1)
.fetchOne(0, Long::class.java)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@ package com.netflix.spinnaker.clouddriver.sql.event
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spinnaker.clouddriver.event.Aggregate
import com.netflix.spinnaker.clouddriver.event.CompositeSpinnakerEvent
import com.netflix.spinnaker.clouddriver.event.EventMetadata
import com.netflix.spinnaker.clouddriver.event.SpinnakerEvent
import com.netflix.spinnaker.clouddriver.event.exceptions.InvalidEventTypeException
import org.jooq.Condition
import org.jooq.Record
import org.jooq.Select
import org.jooq.SelectConditionStep
import org.jooq.SelectWhereStep
import org.jooq.SelectJoinStep
import org.jooq.impl.DSL.currentTimestamp
import java.sql.ResultSet

/**
* Adds an arbitrary number of [conditions] to a query joined by `AND` operator.
*/
internal fun <R : Record> SelectWhereStep<R>.withConditions(conditions: List<Condition>): SelectConditionStep<R> {
internal fun <R : Record> SelectJoinStep<R>.withConditions(conditions: List<Condition>): SelectConditionStep<R> {
return if (conditions.isNotEmpty()) this.where(
conditions.reduce { acc, condition -> acc.and(condition) }
) else {
Expand Down Expand Up @@ -68,23 +68,6 @@ internal fun Select<out Record>.fetchAggregates(): List<SqlAggregate> =
}
}

/**
* Maps a jOOQ result set to an [Aggregate] collection.
*/
internal class AggregateMapper {
fun map(rs: ResultSet): Collection<Aggregate> {
val results = mutableListOf<Aggregate>()
while (rs.next()) {
results.add(Aggregate(
rs.getString("aggregate_type"),
rs.getString("aggregate_id"),
rs.getLong("version")
))
}
return results
}
}

/**
* Converts a [SpinnakerEvent] to a SQL event row. The values are ordered the same as the schema's columns.
*/
Expand All @@ -108,9 +91,15 @@ internal fun Select<out Record>.fetchEvents(objectMapper: ObjectMapper): List<Sp
mutableListOf<SpinnakerEvent>().apply {
while (rs.next()) {
try {
add(objectMapper.readValue(rs.getString("data"), SpinnakerEvent::class.java).apply {
val event = objectMapper.readValue(rs.getString("data"), SpinnakerEvent::class.java).apply {
setMetadata(objectMapper.readValue(rs.getString("metadata"), EventMetadata::class.java))
})
}
if (event is CompositeSpinnakerEvent) {
event.getComposedEvents().forEach {
it.setMetadata(event.getMetadata().copy(id = "N/A", sequence = -1))
}
}
add(event)
} catch (e: JsonProcessingException) {
throw InvalidEventTypeException(e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ class SqlEventRepositoryTest : JUnit5Minutests {
}
}

test("events correctly increment sequence across transactions") {
subject.save("agg", "1", 0, listOf(MyEvent("1"), MyEvent("2")))
subject.save("agg", "1", 1, listOf(MyEvent("3"), MyEvent("4")))

expectThat(subject.list("agg", "1"))
.get { map { it.getMetadata().sequence } }
.isA<List<Long>>()
.containsExactly(1, 2, 3, 4)
}

context("listing aggregates") {
fun Fixture.setupAggregates() {
subject.save("foo", "1", 0, listOf(MyEvent("hi foo")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.netflix.spinnaker.clouddriver.titus.client.model;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import java.util.ArrayList;
Expand All @@ -25,7 +26,6 @@
import java.util.Map;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.Value;
import lombok.experimental.Wither;

Expand All @@ -34,7 +34,9 @@
@Wither
@Value
public class SubmitJobRequest {
@Getter

@JsonDeserialize(builder = Constraint.ConstraintBuilder.class)
@Builder(builderClassName = "ConstraintBuilder", toBuilder = true)
@Value
public static class Constraint {
enum ConstraintType {
Expand All @@ -53,8 +55,11 @@ public static Constraint soft(String constraint) {
return new Constraint(ConstraintType.SOFT, constraint);
}

private final ConstraintType constraintType;
private final String constraint;
@JsonProperty private final ConstraintType constraintType;
@JsonProperty private final String constraint;

@JsonPOJOBuilder(withPrefix = "")
public static class ConstraintBuilder {}
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.netflix.spinnaker.clouddriver.aws.deploy.ops.loadbalancer.TargetGroupLookupHelper;
import com.netflix.spinnaker.clouddriver.event.EventMetadata;
import com.netflix.spinnaker.clouddriver.saga.SagaCommand;
import com.netflix.spinnaker.clouddriver.saga.flow.SagaAction;
import com.netflix.spinnaker.clouddriver.saga.models.Saga;
Expand All @@ -30,8 +31,8 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Value;
import lombok.experimental.NonFinal;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -98,12 +99,17 @@ public Result apply(@NotNull AttachTitusServiceLoadBalancersCommand command, @No
AttachTitusServiceLoadBalancersCommand.AttachTitusServiceLoadBalancersCommandBuilder
.class)
@JsonTypeName("attachTitusServiceLoadBalancersCommand")
@EqualsAndHashCode(callSuper = true)
@Value
public static class AttachTitusServiceLoadBalancersCommand extends SagaCommand {
public static class AttachTitusServiceLoadBalancersCommand implements SagaCommand {
@Nonnull private TitusDeployDescription description;
@Nonnull private String jobUri;
@Nullable private TargetGroupLookupHelper.TargetGroupLookupResult targetGroupLookupResult;
@NonFinal private EventMetadata metadata;

@Override
public void setMetadata(EventMetadata metadata) {
this.metadata = metadata;
}

@JsonPOJOBuilder(withPrefix = "")
public static class AttachTitusServiceLoadBalancersCommandBuilder {}
Expand Down
Loading

0 comments on commit 9d4d5cd

Please sign in to comment.