Skip to content

Commit

Permalink
GH-1587: Fix NPE with Foreign TM and fixTxOffsets
Browse files Browse the repository at this point in the history
Resolves #1587

Code was using the presence of a transaction template instead of the KTM.

**cherry-pick to 2.5.x**
  • Loading branch information
garyrussell authored and artembilan committed Nov 5, 2020
1 parent 9bb4334 commit 620c1a6
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 18 deletions.
Expand Up @@ -410,9 +410,10 @@ public boolean isFixTxOffsets() {
* functionally affect the consumer but some users have expressed concern that the
* "lag" is non-zero. Set this to true and the container will correct such
* mis-reported offsets. The check is performed before the next poll to avoid adding
* significant complexity to the commit processing. IMPORTANT: The lag will only be
* corrected if the consumer is configured with
* {@code isolation.level=read_committed}.
* significant complexity to the commit processing. IMPORTANT: At the time of writing,
* the lag will only be corrected if the consumer is configured with
* {@code isolation.level=read_committed} and {@code max.poll.records} is greater than
* 1. See https://issues.apache.org/jira/browse/KAFKA-10683 for more information.
* @param fixTxOffsets true to correct the offset(s).
* @since 2.5.6
*/
Expand Down
Expand Up @@ -1186,7 +1186,7 @@ private void fixTxOffsetsIfNeeded() {
});
if (toFix.size() > 0) {
this.logger.debug(() -> "Fixing TX offsets: " + toFix);
if (this.transactionTemplate == null) {
if (this.kafkaTxManager == null) {
if (this.syncCommits) {
commitSync(toFix);
}
Expand Down
Expand Up @@ -118,7 +118,7 @@
*/
@EmbeddedKafka(topics = { TransactionalContainerTests.topic1, TransactionalContainerTests.topic2,
TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4,
TransactionalContainerTests.topic5 },
TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7 },
brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" })
public class TransactionalContainerTests {

Expand All @@ -136,6 +136,10 @@ public class TransactionalContainerTests {

public static final String topic5 = "txTopic5";

public static final String topic6 = "txTopic6";

public static final String topic7 = "txTopic7";

private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
Expand Down Expand Up @@ -597,55 +601,73 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.close();
}

@SuppressWarnings("unchecked")
@Test
public void testFixLag() throws Exception {
public void testFixLag() throws InterruptedException {
testFixLagGuts(topic5, 0);
}

@Test
public void testFixLagKTM() throws InterruptedException {
testFixLagGuts(topic6, 1);
}

@Test
public void testFixLagOtherTM() throws InterruptedException {
testFixLagGuts(topic7, 2);
}

@SuppressWarnings("unchecked")
private void testFixLagGuts(String topic, int whichTm) throws InterruptedException {
logger.info("Start testFixLag");
Map<String, Object> props = KafkaTestUtils.consumerProps("txTest2", "false", embeddedKafka);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic5);
ContainerProperties containerProps = new ContainerProperties(topic);
containerProps.setGroupId("txTest2");
containerProps.setPollTimeout(500L);
containerProps.setIdleEventInterval(500L);
containerProps.setFixTxOffsets(true);

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
pf.setTransactionIdPrefix("fl.");
switch (whichTm) {
case 0:
break;
case 1:
containerProps.setTransactionManager(new KafkaTransactionManager<>(pf));
break;
case 2:
containerProps.setTransactionManager(new SomeOtherTransactionManager());
}

final KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<String> transactionalId = new AtomicReference<>();
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
});

@SuppressWarnings({ "rawtypes" })
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
containerProps.setTransactionManager(tm);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testRollbackRecord");
AtomicReference<Map<TopicPartition, OffsetAndMetadata>> committed = new AtomicReference<>();
container.setApplicationEventPublisher(event -> {
if (event instanceof ListenerContainerIdleEvent) {
Consumer<?, ?> consumer = ((ListenerContainerIdleEvent) event).getConsumer();
committed.set(consumer.committed(Set.of(new TopicPartition(topic5, 0))));
if (committed.get().get(new TopicPartition(topic5, 0)) != null) {
committed.set(consumer.committed(Set.of(new TopicPartition(topic, 0))));
if (committed.get().get(new TopicPartition(topic, 0)) != null) {
latch.countDown();
}
}
});
container.start();

template.setDefaultTopic(topic5);
template.setDefaultTopic(topic);
template.executeInTransaction(t -> {
template.sendDefault(0, 0, "foo");
return null;
});
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
TopicPartition partition0 = new TopicPartition(topic5, 0);
TopicPartition partition0 = new TopicPartition(topic, 0);
assertThat(committed.get().get(partition0).offset()).isEqualTo(2L);
container.stop();
pf.destroy();
Expand Down
3 changes: 2 additions & 1 deletion src/reference/asciidoc/kafka.adoc
Expand Up @@ -2300,7 +2300,8 @@ The default executor creates threads named `<name>-C-n`; with the `KafkaMessageL
This does not functionally affect the consumer but some users have expressed concern that the "lag" is non-zero.
Set this property to `true` and the container will correct such mis-reported offsets.
The check is performed before the next poll to avoid adding significant complexity to the commit processing.
The lag will only be corrected if the consumer is configured with `isolation.level=read_committed`.
At the time of writing, the lag will only be corrected if the consumer is configured with `isolation.level=read_committed` and `max.poll.records` is greater than 1.
See https://issues.apache.org/jira/browse/KAFKA-10683[KAFKA-10683] for more information.

|groupId
|`null`
Expand Down

0 comments on commit 620c1a6

Please sign in to comment.