[Fix #1395] Refining All strategy correlation persistence approach#1398
Conversation
There was a problem hiding this comment.
Pull request overview
Refines the persistence-backed “all-strategy” event correlation flow by changing retrieveEvents from returning a newly-built map to instead populating a caller-provided map, and by adjusting correlation to work off a pre-initialized regId→events structure.
Changes:
- Updated
CorrelationOperations.retrieveEventsto a side-effectingvoid retrieveEvents(Map<String, List<CloudEvent>> ...)API. - Refactored
AbstractAllStrategyCorrelationInfoto initialize the regId→events map up-front, load persisted events into it, then run correlation and mark processed. - Updated the BigMap persistence implementation to match the new
retrieveEventssignature (plus minor formatting changes inScheduledEventConsumer).
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java | Updates BigMap persistence implementation to the new side-effecting retrieveEvents API. |
| impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java | Changes retrieveEvents contract from returning a map to mutating a provided one. |
| impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java | Refactors correlation flow to use an initialized regId→events map and new retrieveEvents contract. |
| impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java | Minor lambda formatting adjustment (no functional change). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There were some implicit constraints in the map to be returned by retrieveEvents. I think it is better to pass the Map already populated with the expected registrations associated to an empty modifiable array and let the implementor just add the cloud events to every array. Also, the event can be stored at the end, reducing the likeness of using it for calculations in a different cluster Signed-off-by: fjtirado <ftirados@redhat.com>
| Map<String, Iterator<CloudEvent>> iteratingEvents = | ||
| events.entrySet().stream() | ||
| .collect(Collectors.toMap(Entry::getKey, e -> e.getValue().iterator())); |
There was a problem hiding this comment.
Ideally I would like to use SequenceCollection rather than just Collection here to avoid the map of iterators (and just call removeFirst over the populated collection), but this interface was added in Java 21 and we need to be compliant with JDK 17.
|
|
||
| private Map<String, Collection<CloudEvent>> initMap() { | ||
| return id2RegMapping.keySet().stream() | ||
| .collect(Collectors.toMap(k -> k, k -> new LinkedHashSet<>())); |
There was a problem hiding this comment.
The IA was right, although unlikely, we should prevent duplicate Id in case the implementor messed up, so using a LinkedHashSet (see my comment about SequenceCollection)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java:100
eventAddednow adds the incoming event to the correlation set unconditionally afterretrieveEvents(...). If the same CloudEvent id was already marked as processed, it will have been filtered out ofretrieveEvents, but this code re-introduces it and can correlate/start a workflow again for an already-processed event id (a regression vs the previous flow where the event was stored first and then filtered byprocessedCes). Consider checking the processed state for (regId,eventId) before adding/processing the incoming event (e.g., add anisProcessed(regId, eventId)operation, or haveretrieveEvents/another API return processed ids so duplicates can be dropped).
Map<String, Collection<CloudEvent>> events = initMap();
operations.retrieveEvents(events);
events.get(reg).add(event);
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
operations.storeEvent(reg, event);
markProcessed(operations, result);
ricardozanini
left a comment
There was a problem hiding this comment.
Are you planning to add tests to validate this change? I think we can spam the same workflow on different threads and check for correlation.
Good point |
Signed-off-by: fjtirado <ftirados@redhat.com>
There were some implicit constraints in the map to be returned by
retrieveEventsmethod, whichmakes it better to pass the Map already populated with the expected registrations associated to an empty modifiable list. This will enforce implementors just to add the cloud events associated to every registration list (they are not responsible to create the map or the list any longer)
Also, the event should be stored after the correlation check, reducing the likeness of using it for calculations in a different cluster, event when not suitable db locking strategy exist.