From 3d773390c84dc1044572f674f5b567a6978cd864 Mon Sep 17 00:00:00 2001 From: Sridevi Nagaraj Date: Wed, 16 Feb 2022 17:08:10 -0800 Subject: [PATCH 1/4] Skip onpartitionsrevoked during consumer.close() --- .../kafka/AbstractKafkaBasedConnectorTask.java | 18 ++++++++++++++---- .../KafkaMirrorMakerConnectorTask.java | 4 ++++ gradle/maven.gradle | 2 +- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java index f1d5ab4aa..73d32e41a 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java @@ -90,7 +90,7 @@ abstract public class AbstractKafkaBasedConnectorTask implements Runnable, Consu // lifecycle private volatile Thread _connectorTaskThread; protected volatile boolean _shutdown = false; - private volatile boolean _failure = false; + private volatile boolean _skipOnPartitionsRevoked = false; protected volatile long _lastPolledTimeMillis = System.currentTimeMillis(); protected volatile long _lastPollCompletedTimeMillis = 0; protected final CountDownLatch _startedLatch = new CountDownLatch(1); @@ -256,6 +256,13 @@ protected String getTaskName() { return _taskName; } + /** + * Exposing the flag for overridden classes + */ + protected Boolean getSkipOnPartitionsRevoked() { + return _skipOnPartitionsRevoked; + } + /** * Translate the Kafka consumer records if necessary and send the batch of records to destination. * @param records the Kafka consumer records @@ -304,8 +311,6 @@ protected void rewindAndPausePartitionOnException(TopicPartition srcTopicPartiti } catch (Exception e) { // Seek to last checkpoint failed. Throw an exception to avoid any data loss scenarios where the consumed // offset can be committed even though the send for that offset has failed. - // This flag is used to address 2.4 kafka version behavior changes for onPartitionRevoked calls - _failure = true; String errorMessage = String.format("Partition rewind for %s failed due to ", srcTopicPartition); throw new DatastreamRuntimeException(errorMessage, e); } @@ -435,6 +440,7 @@ public void run() { } finally { if (null != _consumer) { try { + _skipOnPartitionsRevoked = true; _consumer.close(); } catch (Exception e) { _logger.warn(String.format("Got exception on consumer close for task %s.", _taskName), e); @@ -767,9 +773,13 @@ protected void updateConsumerAssignment(Collection partitions) { @Override public void onPartitionsRevoked(Collection topicPartitions) { + if (_skipOnPartitionsRevoked) { + _logger.warn("Skipping commit in onPartitionsRevoked due to an exception."); + return; + } _logger.info("Partition ownership revoked for {}, checkpointing.", topicPartitions); _kafkaTopicPartitionTracker.onPartitionsRevoked(topicPartitions); - if (!_shutdown && !topicPartitions.isEmpty() && !_failure) { // there is a commit at the end of the run method, skip extra commit in shouldDie mode. + if (!_shutdown && !topicPartitions.isEmpty()) { // there is a commit at the end of the run method, skip extra commit in shouldDie mode. try { maybeCommitOffsets(_consumer, true); // happens inline as part of poll } catch (Exception e) { diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index 4d7c92e04..4b2020c6c 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -370,6 +370,10 @@ public void stop() { @Override public void onPartitionsRevoked(Collection partitions) { + if (getSkipOnPartitionsRevoked()) { + _logger.warn("Skipping commit in onPartitionsRevoked due to exception."); + return; + } super.onPartitionsRevoked(partitions); _topicManager.onPartitionsRevoked(partitions); } diff --git a/gradle/maven.gradle b/gradle/maven.gradle index fc6aca855..57e073e06 100644 --- a/gradle/maven.gradle +++ b/gradle/maven.gradle @@ -1,5 +1,5 @@ allprojects { - version = "2.0.0" + version = "3.0.0-SNAPSHOT" } subprojects { From 4db4ba28223b646a0e64b9501eca651f505085d5 Mon Sep 17 00:00:00 2001 From: Sridevi Nagaraj Date: Wed, 16 Feb 2022 17:14:40 -0800 Subject: [PATCH 2/4] Bumping up version --- gradle/maven.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/maven.gradle b/gradle/maven.gradle index 57e073e06..5ffd6c85b 100644 --- a/gradle/maven.gradle +++ b/gradle/maven.gradle @@ -1,5 +1,5 @@ allprojects { - version = "3.0.0-SNAPSHOT" + version = "3.0.0" } subprojects { From bdf2c2b1604f1c065901e2830a455fc63b50b891 Mon Sep 17 00:00:00 2001 From: Sridevi Nagaraj Date: Mon, 7 Mar 2022 13:31:24 -0800 Subject: [PATCH 3/4] Fixing warning messages --- .../connectors/kafka/AbstractKafkaBasedConnectorTask.java | 2 +- .../kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java index 73d32e41a..a587332e3 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java @@ -774,7 +774,7 @@ protected void updateConsumerAssignment(Collection partitions) { @Override public void onPartitionsRevoked(Collection topicPartitions) { if (_skipOnPartitionsRevoked) { - _logger.warn("Skipping commit in onPartitionsRevoked due to an exception."); + _logger.info("Skipping commit in onPartitionsRevoked during consumer.close()."); return; } _logger.info("Partition ownership revoked for {}, checkpointing.", topicPartitions); diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index 4b2020c6c..bc0f3651a 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -371,7 +371,7 @@ public void stop() { @Override public void onPartitionsRevoked(Collection partitions) { if (getSkipOnPartitionsRevoked()) { - _logger.warn("Skipping commit in onPartitionsRevoked due to exception."); + _logger.info("Skipping commit in onPartitionsRevoked during consumer.close()."); return; } super.onPartitionsRevoked(partitions); From 3a4351e1e74a97eadfe5cbd223944452fc554555 Mon Sep 17 00:00:00 2001 From: Sridevi Nagaraj Date: Tue, 8 Mar 2022 13:58:01 -0800 Subject: [PATCH 4/4] Fixing warning messages --- .../connectors/kafka/AbstractKafkaBasedConnectorTask.java | 4 ++-- .../kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java index a587332e3..937090be9 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java @@ -259,7 +259,7 @@ protected String getTaskName() { /** * Exposing the flag for overridden classes */ - protected Boolean getSkipOnPartitionsRevoked() { + protected boolean getSkipOnPartitionsRevoked() { return _skipOnPartitionsRevoked; } @@ -774,7 +774,7 @@ protected void updateConsumerAssignment(Collection partitions) { @Override public void onPartitionsRevoked(Collection topicPartitions) { if (_skipOnPartitionsRevoked) { - _logger.info("Skipping commit in onPartitionsRevoked during consumer.close()."); + _logger.info("Skipping commit in onPartitionsRevoked during consumer.close()"); return; } _logger.info("Partition ownership revoked for {}, checkpointing.", topicPartitions); diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index bc0f3651a..c26003a29 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -371,7 +371,7 @@ public void stop() { @Override public void onPartitionsRevoked(Collection partitions) { if (getSkipOnPartitionsRevoked()) { - _logger.info("Skipping commit in onPartitionsRevoked during consumer.close()."); + _logger.info("Skipping commit in onPartitionsRevoked during consumer.close()"); return; } super.onPartitionsRevoked(partitions);