Skip to content

Commit

Permalink
[RHPAM-4898] Fix concurrency issue in KafkaServerRegistration (#3034)
Browse files Browse the repository at this point in the history
* [FIX_KAFKA_EXECUTOR] fix executor service

* Fix logging output for processed ConsumerRecords

* Avoid synchronization on topic maps

* Restore synchronized method signatures

---------

Co-authored-by: Enrique Gonzalez Martinez <Enrique.Gonzalez.Martinez1@ibm.com>
  • Loading branch information
martinweiler and elguardian committed Mar 6, 2024
1 parent 106ca10 commit afef8f7
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,10 @@ private void processEvents(ConsumerRecords<String, byte[]> events) {


private void printEventsLog(ConsumerRecords<String, byte[]> events) {
Map<String, Integer> eventsPerTopic = new HashMap<>();
for (ConsumerRecord<String, byte[]> event : events) {
logger.trace("Kafka event received {}", event);
eventsPerTopic.compute(event.topic(), (k, v) -> v == null ? 1 : v++);
}
logger.debug("Number of events received per topic {}", eventsPerTopic);
logger.debug("Number of events received per topic {}", events.count());
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jbpm.kie.services.impl.KModuleDeploymentUnit;
Expand All @@ -42,8 +43,8 @@

class KafkaServerRegistration {

private Map<String, Map<SignalDesc, Map<DeploymentId, SortedSet<VersionedDeploymentId>>>> topic2Signal = new HashMap<>();
private Map<String, Map<MessageDesc, Map<DeploymentId, SortedSet<VersionedDeploymentId>>>> topic2Message = new HashMap<>();
private Map<String, Map<SignalDesc, Map<DeploymentId, SortedSet<VersionedDeploymentId>>>> topic2Signal = new ConcurrentHashMap<>();
private Map<String, Map<MessageDesc, Map<DeploymentId, SortedSet<VersionedDeploymentId>>>> topic2Message = new ConcurrentHashMap<>();

synchronized void close() {
topic2Signal.clear();
Expand Down Expand Up @@ -100,7 +101,7 @@ void forEachMessage(ConsumerRecord<String, byte[]> event, KafkaServerEventProces
forEach(topic2Message, event, eventProcessor);
}

private synchronized <T extends SignalDescBase> void forEach(Map<String, Map<T, Map<DeploymentId, SortedSet<VersionedDeploymentId>>>> topic2SignalBase,
private <T extends SignalDescBase> void forEach(Map<String, Map<T, Map<DeploymentId, SortedSet<VersionedDeploymentId>>>> topic2SignalBase,
ConsumerRecord<String, byte[]> event,
KafkaServerEventProcessor<T> processor) {
Map<T, Map<DeploymentId, SortedSet<VersionedDeploymentId>>> signalInfo = topic2SignalBase.get(event.topic());
Expand Down

0 comments on commit afef8f7

Please sign in to comment.