Skip to content

Commit

Permalink
skipping onPartitionsRevoked during consumer.close() call (#886)
Browse files Browse the repository at this point in the history
* Skip onpartitionsrevoked during consumer.close()

* Bumping up version

* Fixing warning messages

* Fixing warning messages
  • Loading branch information
srinagaraj committed Mar 9, 2022
1 parent 4d867ef commit 37544cf
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ abstract public class AbstractKafkaBasedConnectorTask implements Runnable, Consu
// lifecycle
private volatile Thread _connectorTaskThread;
protected volatile boolean _shutdown = false;
private volatile boolean _failure = false;
private volatile boolean _skipOnPartitionsRevoked = false;
protected volatile long _lastPolledTimeMillis = System.currentTimeMillis();
protected volatile long _lastPollCompletedTimeMillis = 0;
protected final CountDownLatch _startedLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -256,6 +256,13 @@ protected String getTaskName() {
return _taskName;
}

/**
* Exposing the flag for overridden classes
*/
protected boolean getSkipOnPartitionsRevoked() {
return _skipOnPartitionsRevoked;
}

/**
* Translate the Kafka consumer records if necessary and send the batch of records to destination.
* @param records the Kafka consumer records
Expand Down Expand Up @@ -304,8 +311,6 @@ protected void rewindAndPausePartitionOnException(TopicPartition srcTopicPartiti
} catch (Exception e) {
// Seek to last checkpoint failed. Throw an exception to avoid any data loss scenarios where the consumed
// offset can be committed even though the send for that offset has failed.
// This flag is used to address 2.4 kafka version behavior changes for onPartitionRevoked calls
_failure = true;
String errorMessage = String.format("Partition rewind for %s failed due to ", srcTopicPartition);
throw new DatastreamRuntimeException(errorMessage, e);
}
Expand Down Expand Up @@ -435,6 +440,7 @@ public void run() {
} finally {
if (null != _consumer) {
try {
_skipOnPartitionsRevoked = true;
_consumer.close();
} catch (Exception e) {
_logger.warn(String.format("Got exception on consumer close for task %s.", _taskName), e);
Expand Down Expand Up @@ -767,9 +773,13 @@ protected void updateConsumerAssignment(Collection<TopicPartition> partitions) {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> topicPartitions) {
if (_skipOnPartitionsRevoked) {
_logger.info("Skipping commit in onPartitionsRevoked during consumer.close()");
return;
}
_logger.info("Partition ownership revoked for {}, checkpointing.", topicPartitions);
_kafkaTopicPartitionTracker.onPartitionsRevoked(topicPartitions);
if (!_shutdown && !topicPartitions.isEmpty() && !_failure) { // there is a commit at the end of the run method, skip extra commit in shouldDie mode.
if (!_shutdown && !topicPartitions.isEmpty()) { // there is a commit at the end of the run method, skip extra commit in shouldDie mode.
try {
maybeCommitOffsets(_consumer, true); // happens inline as part of poll
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ public void stop() {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
if (getSkipOnPartitionsRevoked()) {
_logger.info("Skipping commit in onPartitionsRevoked during consumer.close()");
return;
}
super.onPartitionsRevoked(partitions);
_topicManager.onPartitionsRevoked(partitions);
}
Expand Down
2 changes: 1 addition & 1 deletion gradle/maven.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
allprojects {
version = "2.0.0"
version = "3.0.0"
}

subprojects {
Expand Down

0 comments on commit 37544cf

Please sign in to comment.