Skip to content

Commit

Permalink
Avoid committing consumer after error
Browse files Browse the repository at this point in the history
  • Loading branch information
adrienlauer committed Sep 30, 2020
1 parent 14c25c9 commit 3a7afcc
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,7 @@
# Version 2.0.2 (2020-09-30)

* [fix] Avoid wrongly committing Kafka consumer after an error, which consumed the message without any chance to process it.

# Version 2.0.1 (2020-08-05)

* [chg] Updated for seed 3.10+
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -19,7 +19,7 @@

<groupId>org.seedstack.addons.kafka</groupId>
<artifactId>kafka</artifactId>
<version>2.0.1-SNAPSHOT</version>
<version>2.0.2-SNAPSHOT</version>

<properties>
<seed.version>3.10.0</seed.version>
Expand Down
Expand Up @@ -7,18 +7,7 @@
*/
package org.seedstack.kafka.internal;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.inject.Injector;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -31,6 +20,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkNotNull;

class ConsumerListenerHandler<K, V> implements Runnable, ListenerHandler {
private static final Duration POLLING_DELAY = Duration.ofMillis(1000);
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerListenerHandler.class);
Expand Down Expand Up @@ -60,6 +61,7 @@ public Class<?> getListenerClass() {
public void run() {
String consumerName = annotation.value();
Duration retryDelay = Duration.ofMillis(annotation.retryDelay());
boolean errorOccurred = false;

try {
ConsumerListener<K, V> listener = injector.getInstance(listenerClass);
Expand All @@ -81,6 +83,7 @@ public void run() {
}
} catch (Exception e1) {
// Ignore exception processing if we are stopping
errorOccurred = true;
if (active.get()) {
try {
listener.onException(e1);
Expand All @@ -96,13 +99,15 @@ public void run() {
} finally {
synchronized (this) {
if (consumer != null) {
LOGGER.info("Synchronously committing Kafka consumer: {}", consumerName);
consumer.commitSync();
if (!errorOccurred) {
LOGGER.info("Synchronously committing Kafka consumer: {}", consumerName);
consumer.commitSync();
}
LOGGER.info("Closing Kafka consumer: {}", consumerName);
try {
consumer.close();
} catch (Exception e) {
LOGGER.warn("Unable to properly close Kafka consumer: {}", e);
LOGGER.warn("Unable to properly close Kafka consumer: {}", consumerName, e);
}
}
}
Expand Down

0 comments on commit 3a7afcc

Please sign in to comment.