Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

skipping onPartitionsRevoked during consumer.close() call #886

Merged
merged 4 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ abstract public class AbstractKafkaBasedConnectorTask implements Runnable, Consu
// lifecycle
private volatile Thread _connectorTaskThread;
protected volatile boolean _shutdown = false;
private volatile boolean _failure = false;
private volatile boolean _skipOnPartitionsRevoked = false;
protected volatile long _lastPolledTimeMillis = System.currentTimeMillis();
protected volatile long _lastPollCompletedTimeMillis = 0;
protected final CountDownLatch _startedLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -256,6 +256,13 @@ protected String getTaskName() {
return _taskName;
}

/**
* Exposing the flag for overridden classes
*/
protected Boolean getSkipOnPartitionsRevoked() {
srinagaraj marked this conversation as resolved.
Show resolved Hide resolved
return _skipOnPartitionsRevoked;
}

/**
* Translate the Kafka consumer records if necessary and send the batch of records to destination.
* @param records the Kafka consumer records
Expand Down Expand Up @@ -304,8 +311,6 @@ protected void rewindAndPausePartitionOnException(TopicPartition srcTopicPartiti
} catch (Exception e) {
// Seek to last checkpoint failed. Throw an exception to avoid any data loss scenarios where the consumed
// offset can be committed even though the send for that offset has failed.
// This flag is used to address 2.4 kafka version behavior changes for onPartitionRevoked calls
_failure = true;
String errorMessage = String.format("Partition rewind for %s failed due to ", srcTopicPartition);
throw new DatastreamRuntimeException(errorMessage, e);
}
Expand Down Expand Up @@ -435,6 +440,7 @@ public void run() {
} finally {
if (null != _consumer) {
try {
_skipOnPartitionsRevoked = true;
_consumer.close();
} catch (Exception e) {
_logger.warn(String.format("Got exception on consumer close for task %s.", _taskName), e);
Expand Down Expand Up @@ -767,9 +773,13 @@ protected void updateConsumerAssignment(Collection<TopicPartition> partitions) {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> topicPartitions) {
if (_skipOnPartitionsRevoked) {
_logger.warn("Skipping commit in onPartitionsRevoked due to an exception.");
srinagaraj marked this conversation as resolved.
Show resolved Hide resolved
return;
}
_logger.info("Partition ownership revoked for {}, checkpointing.", topicPartitions);
_kafkaTopicPartitionTracker.onPartitionsRevoked(topicPartitions);
if (!_shutdown && !topicPartitions.isEmpty() && !_failure) { // there is a commit at the end of the run method, skip extra commit in shouldDie mode.
if (!_shutdown && !topicPartitions.isEmpty()) { // there is a commit at the end of the run method, skip extra commit in shouldDie mode.
try {
maybeCommitOffsets(_consumer, true); // happens inline as part of poll
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ public void stop() {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
if (getSkipOnPartitionsRevoked()) {
_logger.warn("Skipping commit in onPartitionsRevoked due to exception.");
srinagaraj marked this conversation as resolved.
Show resolved Hide resolved
return;
}
super.onPartitionsRevoked(partitions);
_topicManager.onPartitionsRevoked(partitions);
}
Expand Down
2 changes: 1 addition & 1 deletion gradle/maven.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
allprojects {
version = "2.0.0"
version = "3.0.0"
}

subprojects {
Expand Down