Skip to content

Commit

Permalink
fix(saga): Update sql task with saga IDs (#4062)
Browse files Browse the repository at this point in the history
Tasks are created prior to the atomic operation processor operation.  In order to bridge atomic operations and sagas, we store the saga IDs when added to the SqlTask during the bridge process.
  • Loading branch information
jonsie authored and robzienert committed Oct 2, 2019
1 parent d8e08c4 commit 459da3b
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,19 @@
*/
package com.netflix.spinnaker.clouddriver.data.task;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import javax.annotation.Nonnull;
import lombok.Builder;
import lombok.Value;

@Value
@Builder(builderClassName = "SagaIdBuilder")
@JsonDeserialize(builder = SagaId.SagaIdBuilder.class)
public class SagaId {
@Nonnull String name;
@Nonnull String id;

@JsonPOJOBuilder(withPrefix = "")
public static class SagaIdBuilder {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public <T> T apply(@Nonnull ApplyCommandWrapper applyCommand) {
final String sagaName = applyCommand.sagaName;
final String sagaId = Optional.ofNullable(task.getRequestId()).orElse(task.getId());

task.addSagaId(new SagaId(sagaName, sagaId));
task.addSagaId(SagaId.builder().id(sagaId).name(sagaName).build());

applyCommand.sagaFlow.injectFirst(SnapshotAtomicOperationInput.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class SqlTask(
override fun addSagaId(sagaId: SagaId) {
this.dirty.set(true)
sagaIds.add(sagaId)
repository.updateSagaIds(this)
}

override fun getSagaIds(): MutableList<SagaId> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ class SqlTaskRepository(
return task
}

fun updateSagaIds(task: Task) {
return withPool(POOL_NAME) {
jooq.transactional(sqlRetryProperties.transactions) { ctx ->
ctx.update(tasksTable)
.set(field("saga_ids"), mapper.writeValueAsString(task.sagaIds))
.where(field("id").eq(task.id))
.execute()
}
}
}

override fun get(id: String): Task? {
return retrieveInternal(id)
}
Expand Down

0 comments on commit 459da3b

Please sign in to comment.