Skip to content

Commit

Permalink
GH-1587: Option to Correct Transactional Offsets
Browse files Browse the repository at this point in the history
Resolves #1587

See javadoc for `ConsumerProperties.setFixTxOffsets()` for more information.

**cherry-pick to 2.5.x**

* Add `this.` to `logger` to honor Checkstyle
  • Loading branch information
garyrussell authored and artembilan committed Sep 15, 2020
1 parent a0e2b25 commit 80aa8fd
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public class ConsumerProperties {

private int commitRetries = DEFAULT_COMMIT_RETRIES;

private boolean fixTxOffsets;

/**
* Create properties for a container that will subscribe to the specified topics.
* @param topics the topics.
Expand Down Expand Up @@ -390,6 +392,32 @@ public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) {
this.onlyLogRecordMetadata = onlyLogRecordMetadata;
}

/**
* Whether or not to correct terminal transactional offsets.
* @return true to fix.
* @since 2.5.6
* @see #setFixTxOffsets(boolean)
*/
public boolean isFixTxOffsets() {
return this.fixTxOffsets;
}

/**
* When consuming records produced by a transactional producer, and the consumer is
* positioned at the end of a partition, the lag can incorrectly be reported as
* greater than zero, due to the pseudo record used to indicate transaction
* commit/rollback and, possibly, the presence of rolled-back records. This does not
* functionally affect the consumer but some users have expressed concern that the
* "lag" is non-zero. Set this to true and the container will correct such
* mis-reported offsets. The check is performed before the next poll to avoid adding
* significant complexity to the commit processing.
* @param fixTxOffsets true to correct the offset(s).
* @since 2.5.6
*/
public void setFixTxOffsets(boolean fixTxOffsets) {
this.fixTxOffsets = fixTxOffsets;
}

@Override
public String toString() {
return "ConsumerProperties ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final Collection<TopicPartition> assignedPartitions = new LinkedHashSet<>();

private final Map<TopicPartition, OffsetAndMetadata> lastCommits = new HashMap<>();

private final GenericMessageListener<?> genericListener;

private final ConsumerSeekAware consumerSeekAwareListener;
Expand Down Expand Up @@ -557,6 +559,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final String clientId;

private final boolean fixTxOffsets = this.containerProperties.isFixTxOffsets();

private Map<TopicPartition, OffsetMetadata> definedPartitions;

private int count;
Expand Down Expand Up @@ -1090,6 +1094,7 @@ protected void pollAndInvoke() {
if (!this.autoCommit && !this.isRecordAck) {
processCommits();
}
fixTxOffsetsIfNeeded();
idleBetweenPollIfNecessary();
if (this.seeks.size() > 0) {
processSeeks();
Expand Down Expand Up @@ -1124,6 +1129,42 @@ protected void pollAndInvoke() {
}
}

@SuppressWarnings("rawtypes")
private void fixTxOffsetsIfNeeded() {
if (this.fixTxOffsets) {
try {
Map<TopicPartition, OffsetAndMetadata> toFix = new HashMap<>();
this.lastCommits.forEach((tp, oamd) -> {
long position = this.consumer.position(tp);
if (position > oamd.offset()) {
toFix.put(tp, new OffsetAndMetadata(position));
}
});
if (toFix.size() > 0) {
this.logger.debug(() -> "Fixing TX offsets: " + toFix);
if (this.transactionTemplate == null) {
if (this.syncCommits) {
commitSync(toFix);
}
else {
commitAsync(toFix, 0);
}
}
else {
this.transactionTemplate.executeWithoutResult(status -> {
doSendOffsets(((KafkaResourceHolder) TransactionSynchronizationManager
.getResource(this.kafkaTxManager.getProducerFactory()))
.getProducer(), toFix);
});
}
}
}
catch (Exception e) {
this.logger.error(e, "Failed to correct transactional offset(s)");
}
}
}

private ConsumerRecords<K, V> doPoll() {
ConsumerRecords<K, V> records;
if (this.isBatchListener && this.subBatchPerPartition) {
Expand Down Expand Up @@ -1215,8 +1256,7 @@ private void checkIdle() {
long now = System.currentTimeMillis();
if (now > this.lastReceive + this.containerProperties.getIdleEventInterval()
&& now > this.lastAlertAt + this.containerProperties.getIdleEventInterval()) {
publishIdleContainerEvent(now - this.lastReceive, this.isConsumerAwareListener
? this.consumer : null, this.consumerPaused);
publishIdleContainerEvent(now - this.lastReceive, this.consumer, this.consumerPaused);
this.lastAlertAt = now;
if (this.consumerSeekAwareListener != null) {
Collection<TopicPartition> partitions = getAssignedPartitions();
Expand Down Expand Up @@ -1392,6 +1432,9 @@ private void commitAsync(Map<TopicPartition, OffsetAndMetadata> commits, int ret
}
else {
this.commitCallback.onComplete(offsetsAttempted, exception);
if (this.fixTxOffsets) {
this.lastCommits.putAll(commits);
}
}
});
}
Expand Down Expand Up @@ -2022,6 +2065,9 @@ private void doSendOffsets(Producer<?, ?> prod, Map<TopicPartition, OffsetAndMet
else {
prod.sendOffsetsToTransaction(commits, this.consumer.groupMetadata());
}
if (this.fixTxOffsets) {
this.lastCommits.putAll(commits);
}
}

private void processCommits() {
Expand Down Expand Up @@ -2245,6 +2291,9 @@ private void commitSync(Map<TopicPartition, OffsetAndMetadata> commits) {
private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> commits, int retries) {
try {
this.consumer.commitSync(commits, this.syncCommitTimeout);
if (this.fixTxOffsets) {
this.lastCommits.putAll(commits);
}
}
catch (RetriableCommitFailedException e) {
if (retries >= this.containerProperties.getCommitRetries()) {
Expand Down Expand Up @@ -2450,6 +2499,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
try {
// Wait until now to commit, in case the user listener added acks
commitPendingAcks();
fixTxOffsetsIfNeeded();
}
catch (Exception e) {
ListenerConsumer.this.logger.error(e, () -> "Fatal commit error after revocation "
Expand All @@ -2465,6 +2515,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
if (ListenerConsumer.this.assignedPartitions != null) {
ListenerConsumer.this.assignedPartitions.removeAll(partitions);
}
partitions.forEach(tp -> ListenerConsumer.this.lastCommits.remove(tp));
}
finally {
if (ListenerConsumer.this.kafkaTxManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -82,6 +83,7 @@
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ProducerFactoryUtils;
import org.springframework.kafka.event.ConsumerStoppedEvent;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
Expand Down Expand Up @@ -499,7 +501,9 @@ public void testRollbackRecord() throws Exception {
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic1, topic2);
containerProps.setGroupId("group");
containerProps.setPollTimeout(10_000);
containerProps.setPollTimeout(500L);
containerProps.setIdleEventInterval(500L);
containerProps.setFixTxOffsets(true);

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
Expand Down Expand Up @@ -533,6 +537,17 @@ public void testRollbackRecord() throws Exception {
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testRollbackRecord");
AtomicReference<Map<TopicPartition, OffsetAndMetadata>> committed = new AtomicReference<>();
CountDownLatch idleLatch = new CountDownLatch(1);
container.setApplicationEventPublisher(event -> {
if (event instanceof ListenerContainerIdleEvent) {
Consumer<?, ?> consumer = ((ListenerContainerIdleEvent) event).getConsumer();
committed.set(consumer.committed(Set.of(new TopicPartition(topic1, 0), new TopicPartition(topic1, 1))));
if (committed.get().get(new TopicPartition(topic1, 0)) != null) {
idleLatch.countDown();
}
}
});
container.start();

template.setDefaultTopic(topic1);
Expand All @@ -541,6 +556,10 @@ public void testRollbackRecord() throws Exception {
return null;
});
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(idleLatch.await(10, TimeUnit.SECONDS)).isTrue();
TopicPartition partition0 = new TopicPartition(topic1, 0);
assertThat(committed.get().get(partition0).offset()).isEqualTo(2L);
assertThat(committed.get().get(new TopicPartition(topic1, 1))).isNull();
container.stop();
Consumer<Integer, String> consumer = cf.createConsumer();
final CountDownLatch subsLatch = new CountDownLatch(1);
Expand All @@ -564,8 +583,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
assertThat(subsLatch.await(1, TimeUnit.MILLISECONDS)).isTrue();
assertThat(records.count()).isEqualTo(0);
// depending on timing, the position might include the offset representing the commit in the log
assertThat(consumer.position(new TopicPartition(topic1, 0))).isGreaterThanOrEqualTo(1L);
assertThat(consumer.position(partition0)).isEqualTo(2L);
assertThat(transactionalId.get()).startsWith("rr.group.txTopic");
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).isEmpty();
logger.info("Stop testRollbackRecord");
Expand Down
7 changes: 7 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2235,6 +2235,13 @@ The default executor creates threads named `<name>-C-n`; with the `KafkaMessageL
|`ALPHA`
|Exactly Once Semantics mode; see <<exactly-once>>.

|fixTxOffsets
|`false`
|When consuming records produced by a transactional producer, and the consumer is positioned at the end of a partition, the lag can incorrectly be reported as greater than zero, due to the pseudo record used to indicate transaction commit/rollback and, possibly, the presence of rolled-back records.
This does not functionally affect the consumer but some users have expressed concern that the "lag" is non-zero.
Set this property to `true` and the container will correct such mis-reported offsets.
The check is performed before the next poll to avoid adding significant complexity to the commit processing.

|groupId
|`null`
|Overrides the consumer `group.id` property; automatically set by the `@KafkaListener` `id` or `groupId` property.
Expand Down

0 comments on commit 80aa8fd

Please sign in to comment.