diff --git a/CHANGELOG.md b/CHANGELOG.md
index b436df1..f1ff748 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+# Version 2.0.2 (2020-09-30)
+
+* [fix] Avoid wrongly committing Kafka consumer after an error, which consumed the message without any chance to process it.
+
# Version 2.0.1 (2020-08-05)
* [chg] Updated for seed 3.10+
diff --git a/pom.xml b/pom.xml
index c5ecf84..6a17057 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,7 +19,7 @@
org.seedstack.addons.kafka
kafka
- 2.0.1-SNAPSHOT
+ 2.0.2-SNAPSHOT
3.10.0
diff --git a/src/main/java/org/seedstack/kafka/internal/ConsumerListenerHandler.java b/src/main/java/org/seedstack/kafka/internal/ConsumerListenerHandler.java
index f9319ab..42a6b4b 100644
--- a/src/main/java/org/seedstack/kafka/internal/ConsumerListenerHandler.java
+++ b/src/main/java/org/seedstack/kafka/internal/ConsumerListenerHandler.java
@@ -7,18 +7,7 @@
*/
package org.seedstack.kafka.internal;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import com.google.inject.Injector;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import javax.inject.Inject;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -31,6 +20,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.inject.Inject;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
class ConsumerListenerHandler implements Runnable, ListenerHandler {
private static final Duration POLLING_DELAY = Duration.ofMillis(1000);
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerListenerHandler.class);
@@ -60,6 +61,7 @@ public Class> getListenerClass() {
public void run() {
String consumerName = annotation.value();
Duration retryDelay = Duration.ofMillis(annotation.retryDelay());
+ boolean errorOccurred = false;
try {
ConsumerListener listener = injector.getInstance(listenerClass);
@@ -81,6 +83,7 @@ public void run() {
}
} catch (Exception e1) {
// Ignore exception processing if we are stopping
+ errorOccurred = true;
if (active.get()) {
try {
listener.onException(e1);
@@ -96,13 +99,15 @@ public void run() {
} finally {
synchronized (this) {
if (consumer != null) {
- LOGGER.info("Synchronously committing Kafka consumer: {}", consumerName);
- consumer.commitSync();
+ if (!errorOccurred) {
+ LOGGER.info("Synchronously committing Kafka consumer: {}", consumerName);
+ consumer.commitSync();
+ }
LOGGER.info("Closing Kafka consumer: {}", consumerName);
try {
consumer.close();
} catch (Exception e) {
- LOGGER.warn("Unable to properly close Kafka consumer: {}", e);
+ LOGGER.warn("Unable to properly close Kafka consumer: {}", consumerName, e);
}
}
}