diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java index f1d5ab4aa..937090be9 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java @@ -90,7 +90,7 @@ abstract public class AbstractKafkaBasedConnectorTask implements Runnable, Consu // lifecycle private volatile Thread _connectorTaskThread; protected volatile boolean _shutdown = false; - private volatile boolean _failure = false; + private volatile boolean _skipOnPartitionsRevoked = false; protected volatile long _lastPolledTimeMillis = System.currentTimeMillis(); protected volatile long _lastPollCompletedTimeMillis = 0; protected final CountDownLatch _startedLatch = new CountDownLatch(1); @@ -256,6 +256,13 @@ protected String getTaskName() { return _taskName; } + /** + * Exposing the flag for overridden classes + */ + protected boolean getSkipOnPartitionsRevoked() { + return _skipOnPartitionsRevoked; + } + /** * Translate the Kafka consumer records if necessary and send the batch of records to destination. * @param records the Kafka consumer records @@ -304,8 +311,6 @@ protected void rewindAndPausePartitionOnException(TopicPartition srcTopicPartiti } catch (Exception e) { // Seek to last checkpoint failed. Throw an exception to avoid any data loss scenarios where the consumed // offset can be committed even though the send for that offset has failed. - // This flag is used to address 2.4 kafka version behavior changes for onPartitionRevoked calls - _failure = true; String errorMessage = String.format("Partition rewind for %s failed due to ", srcTopicPartition); throw new DatastreamRuntimeException(errorMessage, e); } @@ -435,6 +440,7 @@ public void run() { } finally { if (null != _consumer) { try { + _skipOnPartitionsRevoked = true; _consumer.close(); } catch (Exception e) { _logger.warn(String.format("Got exception on consumer close for task %s.", _taskName), e); @@ -767,9 +773,13 @@ protected void updateConsumerAssignment(Collection partitions) { @Override public void onPartitionsRevoked(Collection topicPartitions) { + if (_skipOnPartitionsRevoked) { + _logger.info("Skipping commit in onPartitionsRevoked during consumer.close()"); + return; + } _logger.info("Partition ownership revoked for {}, checkpointing.", topicPartitions); _kafkaTopicPartitionTracker.onPartitionsRevoked(topicPartitions); - if (!_shutdown && !topicPartitions.isEmpty() && !_failure) { // there is a commit at the end of the run method, skip extra commit in shouldDie mode. + if (!_shutdown && !topicPartitions.isEmpty()) { // there is a commit at the end of the run method, skip extra commit in shouldDie mode. try { maybeCommitOffsets(_consumer, true); // happens inline as part of poll } catch (Exception e) { diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index 4d7c92e04..c26003a29 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -370,6 +370,10 @@ public void stop() { @Override public void onPartitionsRevoked(Collection partitions) { + if (getSkipOnPartitionsRevoked()) { + _logger.info("Skipping commit in onPartitionsRevoked during consumer.close()"); + return; + } super.onPartitionsRevoked(partitions); _topicManager.onPartitionsRevoked(partitions); } diff --git a/gradle/maven.gradle b/gradle/maven.gradle index fc6aca855..5ffd6c85b 100644 --- a/gradle/maven.gradle +++ b/gradle/maven.gradle @@ -1,5 +1,5 @@ allprojects { - version = "2.0.0" + version = "3.0.0" } subprojects {