Skip to content

Commit

Permalink
fix(event): Fixing some concurrency issues in event store (#3954)
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert committed Aug 16, 2019
1 parent 0ee7f7b commit 0fb4ed3
Showing 1 changed file with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.springframework.context.ApplicationEventPublisher
import org.springframework.scheduling.annotation.Scheduled
import java.time.Duration
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import kotlin.math.max

/**
Expand All @@ -47,9 +48,8 @@ class InMemoryEventRepository(
private val eventWriteCountId = registry.createId("eventing.events.writes")
private val eventReadCountId = registry.createId("eventing.events.reads")

private val events: MutableMap<Aggregate, MutableList<SpinnakerEvent>> = mutableMapOf()
private val events: MutableMap<Aggregate, MutableList<SpinnakerEvent>> = ConcurrentHashMap()

@Synchronized
override fun save(
aggregateType: String,
aggregateId: String,
Expand All @@ -68,18 +68,21 @@ class InMemoryEventRepository(
"(version: ${aggregate.version}, originatingVersion: $originatingVersion)")
}

val currentSequence = this.events[aggregate]!!.map { it.metadata.sequence }.max() ?: 0
newEvents.forEachIndexed { index, sagaEvent ->
// TODO(rz): Plugin more metadata (provenance, serviceVersion, etc)
sagaEvent.metadata = EventMetadata(
sequence = currentSequence + (index + 1),
originatingVersion = originatingVersion
)
}
events.getOrPut(aggregate) { mutableListOf() }.let { aggregateEvents ->
val currentSequence = aggregateEvents.map { it.metadata.sequence }.max() ?: 0

newEvents.forEachIndexed { index, sagaEvent ->
// TODO(rz): Plugin more metadata (provenance, serviceVersion, etc)
sagaEvent.metadata = EventMetadata(
sequence = currentSequence + (index + 1),
originatingVersion = originatingVersion
)
}

registry.counter(eventWriteCountId).increment(newEvents.size.toLong())
this.events[aggregate]!!.addAll(newEvents)
aggregate.version = aggregate.version + 1
registry.counter(eventWriteCountId).increment(newEvents.size.toLong())
aggregateEvents.addAll(newEvents)
aggregate.version = aggregate.version + 1
}

log.debug("Saved $aggregateType/$aggregateId@${aggregate.version}: " +
"[${newEvents.joinToString(",") { it.javaClass.simpleName }}]")
Expand Down

0 comments on commit 0fb4ed3

Please sign in to comment.