Skip to content

Commit

Permalink
fix(saga): Handle concurrency issue when same op is sent more than on…
Browse files Browse the repository at this point in the history
…ce (#4229)

* fix(saga): Handle concurrency issue when same op is sent more than once

* Update SqlEventRepository.kt
  • Loading branch information
robzienert authored and mergify[bot] committed Dec 18, 2019
1 parent 3f10a63 commit 1ab574c
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.cats.thread.NamedThreadFactory
import com.netflix.spinnaker.clouddriver.data.task.Task
import com.netflix.spinnaker.clouddriver.data.task.TaskRepository
import com.netflix.spinnaker.clouddriver.event.exceptions.DuplicateEventAggregateException
import com.netflix.spinnaker.clouddriver.metrics.TimedCallable
import com.netflix.spinnaker.clouddriver.orchestration.events.OperationEvent
import com.netflix.spinnaker.clouddriver.orchestration.events.OperationEventHandler
Expand Down Expand Up @@ -124,6 +125,11 @@ class DefaultOrchestrationProcessor implements OrchestrationProcessor {
task.updateStatus TASK_PHASE, "Orchestration failed: ${atomicOperation.class.simpleName} | ${e.class.simpleName}: [${e.errors.join(', ')}]"
task.addResultObjects([extractExceptionSummary(e, e.errors.join(", "), [operation: atomicOperation.class.simpleName])])
failTask(task, e)
} catch (DuplicateEventAggregateException e) {
// In this case, we can safely assume that the atomic operation is being run elsewhere and can just return
// the existing task.
log.warn("Received duplicate event aggregate: Indicative of receiving the same operation twice. Noop'ing and returning the task pointer", e)
return getTask(clientRequestId)
} catch (e) {
def message = e.message
def stringWriter = new StringWriter()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.event.exceptions

import com.netflix.spinnaker.kork.exceptions.SystemException

class DuplicateEventAggregateException(e: Exception) : SystemException(e), EventingException
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.netflix.spinnaker.clouddriver.event.CompositeSpinnakerEvent
import com.netflix.spinnaker.clouddriver.event.EventMetadata
import com.netflix.spinnaker.clouddriver.event.SpinnakerEvent
import com.netflix.spinnaker.clouddriver.event.exceptions.AggregateChangeRejectedException
import com.netflix.spinnaker.clouddriver.event.exceptions.DuplicateEventAggregateException
import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository
import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository.ListAggregatesCriteria
import com.netflix.spinnaker.clouddriver.sql.transactional
Expand All @@ -37,6 +38,7 @@ import org.jooq.impl.DSL.max
import org.jooq.impl.DSL.table
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationEventPublisher
import java.sql.SQLIntegrityConstraintViolationException
import java.util.UUID

class SqlEventRepository(
Expand Down Expand Up @@ -82,10 +84,21 @@ class SqlEventRepository(
field("version") to 0
)

ctx.insertInto(AGGREGATES_TABLE)
.columns(initialAggregate.keys)
.values(initialAggregate.values)
.execute()
try {
ctx.insertInto(AGGREGATES_TABLE)
.columns(initialAggregate.keys)
.values(initialAggregate.values)
.execute()
} catch (e: SQLIntegrityConstraintViolationException) {
// In the event that two requests are made at the same time to create a new aggregate (via two diff
// clouddriver instances), catch the exception and bubble it up as a duplicate exception so that it
// may be processed in an idempotent way, rather than causing an error.
//
// This is preferential to going back to the database to load the existing aggregate record, since we
// already know the aggregate version will not match the originating version expected from this process
// and would fail just below anyway.
throw DuplicateEventAggregateException(e)
}

Aggregate(aggregateType, aggregateId, 0)
}()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ import java.util.concurrent.TimeUnit

@ConfigurationProperties("sql.agent.task-cleanup")
class SqlTaskCleanupAgentProperties {
var completedTtlMs: Long = TimeUnit.MINUTES.toMillis(60)
var completedTtlMs: Long = TimeUnit.DAYS.toMillis(4)
var batchSize: Int = 100
}

0 comments on commit 1ab574c

Please sign in to comment.