Skip to content

Commit

Permalink
[JBPM-9520] KafkaServerExtension poll is not freed
Browse files Browse the repository at this point in the history
Changing lock policy
  • Loading branch information
fjtirado committed Dec 16, 2020
1 parent 36dc690 commit b51199c
Showing 1 changed file with 31 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,18 @@ public class KafkaServerExtension implements KieServerExtension, DeploymentEvent
Runnable {

private static final Logger logger = LoggerFactory.getLogger(KafkaServerExtension.class);

public static final String EXTENSION_NAME = "Kafka";
static final String KAFKA_EXTENSION_PREFIX = "org.kie.server.jbpm-kafka.ext.";
static final String TOPIC_PREFIX = KAFKA_EXTENSION_PREFIX + "topics.";
static final String SIGNAL_MAPPING_PROPERTY = KAFKA_EXTENSION_PREFIX + "signals.mapping";
static final String MESSAGE_MAPPING_PROPERTY = KAFKA_EXTENSION_PREFIX + "message.mapping";
private static final Mapping SIGNAL_MAPPING_DEFAULT = Mapping.NONE;
private static final Mapping MESSAGE_MAPPING_DEFAULT = Mapping.AUTO;

enum Mapping {
AUTO, NONE
}


private AtomicBoolean initialized = new AtomicBoolean();
// Kafka consumer
Expand Down Expand Up @@ -176,24 +175,17 @@ public void destroy(KieServerImpl kieServer, KieServerRegistry registry) {
if (producerReady.compareAndSet(true, false)) {
producer.close(duration);
}

if (consumerReady.compareAndSet(true, false)) {
notifyService.getAndSet(null).shutdownNow();
consumer.wakeup();
consumerLock.lock();
try {
consumer.unsubscribe();
consumer.close(duration);
} finally {
consumerLock.unlock();
}
consumer = null;
}
changeRegistrationLock.lock();
try {
topic2Signal.clear();
topic2Message.clear();
isSubscribedCond.signal();
if (consumerReady.compareAndSet(true, false)) {
notifyService.getAndSet(null).shutdownNow();
unsubscribe();
consumer.close(duration);
consumer = null;
}
} finally {
changeRegistrationLock.unlock();
}
Expand Down Expand Up @@ -322,34 +314,28 @@ private void updateRegistration(DeploymentEvent event, BiConsumer<String, Proces
}
topic2Register.addAll(topic2Signal.keySet());
topic2Register.addAll(topic2Message.keySet());
} finally {
changeRegistrationLock.unlock();
}

if (topic2Register.isEmpty()) {
if (consumerReady.get()) {
if (topic2Register.isEmpty()) {
if (consumerReady.get()) {
unsubscribe();
}
} else if (consumerReady.compareAndSet(false, true)) {
logger.trace("Creating kafka consumer");
consumer = getKafkaConsumer();
subscribe(topic2Register);
notifyService.set(
new ThreadPoolExecutor(1, Integer.getInteger(KAFKA_EXTENSION_PREFIX + "maxNotifyThreads", 10),
60L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>()));
new Thread(this).start();
} else {
consumer.wakeup();
unsubscribe();
}
}
else if (consumerReady.compareAndSet(false, true)) {
logger.trace("Creating kafka consumer");
consumer = getKafkaConsumer();
subscribe(topic2Register);
notifyService.set(
new ThreadPoolExecutor(1, Integer.getInteger(KAFKA_EXTENSION_PREFIX + "maxNotifyThreads", 10), 60L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>()));
new Thread(this).start();
} else {
consumer.wakeup();
subscribe(topic2Register);
changeRegistrationLock.lock();
try {
subscribe(topic2Register);
isSubscribedCond.signal();
} finally {
changeRegistrationLock.unlock();
}
} finally {
changeRegistrationLock.unlock();
}

}

private void subscribe(Set<String> topic2Register) {
Expand All @@ -363,6 +349,7 @@ private void subscribe(Set<String> topic2Register) {
}

private void unsubscribe() {
consumer.wakeup();
consumerLock.lock();
try {
consumer.unsubscribe();
Expand All @@ -375,12 +362,11 @@ private void unsubscribe() {
private static <T extends SignalDescBase> String topicFromSignal(T signal) {
return topicFromSignal(signal.getName());
}

private static String topicFromSignal(String name) {
return System.getProperty(TOPIC_PREFIX + name, name);
}


@Override
public boolean isUpdateContainerAllowed(String id,
KieContainerInstance kieContainerInstance,
Expand Down Expand Up @@ -413,7 +399,6 @@ public String toString() {
return EXTENSION_NAME + " KIE Server extension";
}


@Override
public void run() {
Duration duration =
Expand Down Expand Up @@ -447,8 +432,8 @@ private void checkSubscribed() throws InterruptedException {
private ConsumerRecords<String, byte[]> pollEvents(Duration duration) {
ConsumerRecords<String, byte[]> events = ConsumerRecords.empty();
if (consumerReady.get()) {
consumerLock.lock();
try {
consumerLock.lock();
events = consumer.poll(duration);
} catch (WakeupException ex) {
logger.trace("Kafka wait interrupted");
Expand Down Expand Up @@ -486,8 +471,8 @@ private interface Signaller {
void signalEvent(String deploymentId, String signalName, Object data);
}

private Signaller messageSignaller = (deployment, signalName, data) -> signalEvent(deployment, "Message-" +
signalName, data);
private Signaller messageSignaller =
(deployment, signalName, data) -> signalEvent(deployment, "Message-" + signalName, data);

private void processEvent(ConsumerRecord<String, byte[]> event) {
changeRegistrationLock.lock();
Expand All @@ -499,7 +484,6 @@ private void processEvent(ConsumerRecord<String, byte[]> event) {
}
}


private void signalEvent(String deployment, String signalName, Object data) {
processService.signalEvent(deployment, signalName, data);
}
Expand Down Expand Up @@ -544,14 +528,11 @@ private <T extends SignalDescBase> Class<?> getDataClass(String deploymentId,
return dataClazz.orElse(Object.class);
}


@Override
public void onMessage(MessageEvent event) {
if (processMessages()) {
sendEvent(event.getProcessInstance(), event.getMessageName(), event.getMessage());
}


}

@Override
Expand All @@ -564,7 +545,6 @@ public void onSignal(SignalEvent event) {
private void sendEvent(ProcessInstance processInstance,
String name,
Object value) {

if (producerReady.compareAndSet(false, true)) {
producer = getKafkaProducer();
}
Expand All @@ -578,7 +558,6 @@ private void sendEvent(ProcessInstance processInstance,
} catch (Exception e) {
logError(value, e);
}

}

private static Mapping getMapping(String propName, Mapping defaultValue) {
Expand Down Expand Up @@ -649,7 +628,6 @@ public <T> T getAppComponents(Class<T> serviceType) {
return null;
}


@Override
public void beforeProcessStarted(ProcessStartedEvent event) {
// not interested
Expand Down

0 comments on commit b51199c

Please sign in to comment.