Skip to content

Commit

Permalink
Prepare for GH-1480
Browse files Browse the repository at this point in the history
Deprecate existing ARP method ready for the new method in 2.7.

**Merge to 2.6.x only**
  • Loading branch information
garyrussell authored and artembilan committed Jan 15, 2021
1 parent 5609205 commit f736bd9
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.listener.ContainerProperties.EOSMode;
import org.springframework.lang.Nullable;

/**
* Invoked by a listener container with remaining, unprocessed, records
Expand Down Expand Up @@ -58,7 +59,7 @@ public interface AfterRollbackProcessor<K, V> {
* @since 2.2
* @see #isProcessInTransaction()
* @deprecated in favor of
* {@link #process(List, Consumer, Exception, boolean, ContainerProperties.EOSMode)}.
* {@link #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)}.
*/
@Deprecated
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception, boolean recoverable);
Expand All @@ -81,14 +82,42 @@ public interface AfterRollbackProcessor<K, V> {
* @param recoverable the recoverable.
* @param eosMode the {@link EOSMode}.
* @since 2.5.3
* @deprecated in favor of
* {@link #process(List, Consumer, MessageListenerContainer, Exception, boolean, EOSMode)}.
* @see #isProcessInTransaction()
*/
@Deprecated
default void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
boolean recoverable, EOSMode eosMode) {

process(records, consumer, exception, recoverable);
}

/**
* Process the remaining records. Recoverable will be true if the container is
* processing individual records; this allows the processor to recover (skip) the
* failed record rather than re-seeking it. This is not possible with a batch listener
* since only the listener itself knows which record in the batch keeps failing.
* IMPORTANT: If invoked in a transaction when the listener was invoked with a single
* record, the transaction id will be based on the container group.id and the
* topic/partition of the failed record, to avoid issues with zombie fencing. So,
* generally, only its offset should be sent to the transaction. For other behavior
* the process method should manage its own transaction.
* @param records the records.
* @param consumer the consumer.
* @param container the container or parent container.
* @param exception the exception
* @param recoverable the recoverable.
* @param eosMode the {@link EOSMode}.
* @since 2.6.6
* @see #isProcessInTransaction()
*/
default void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {

process(records, consumer, exception, recoverable, eosMode);
}

/**
* Optional method to clear thread state; will be called just before a consumer
* thread terminates.
Expand All @@ -100,13 +129,13 @@ default void clearThreadState() {

/**
* Return true to invoke
* {@link #process(List, Consumer, Exception, boolean, ContainerProperties.EOSMode)}
* {@link #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)}
* in a new transaction. Because the container cannot infer the desired behavior, the
* processor is responsible for sending the offset to the transaction if it decides to
* skip the failing record.
* @return true to run in a transaction; default false.
* @since 2.2.5
* @see #process(List, Consumer, Exception, boolean, ContainerProperties.EOSMode)
* @see #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)
*/
default boolean isProcessInTransaction() {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -141,12 +141,13 @@ private void checkConfig() {
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
boolean recoverable) {

process(records, consumer, exception, recoverable, EOSMode.ALPHA);
process(records, consumer, null, exception, recoverable, EOSMode.ALPHA);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
@Nullable MessageListenerContainer container, Exception exception,
boolean recoverable, @Nullable EOSMode eosMode) {

if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
Expand Down Expand Up @@ -177,15 +178,15 @@ public boolean isProcessInTransaction() {

/**
* {@inheritDoc} Set to true and the container will run the
* {@link #process(List, Consumer, Exception, boolean, ContainerProperties.EOSMode)}
* {@link #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)}
* method in a transaction and, if a record is skipped and recovered, we will send its
* offset to the transaction. Requires a {@link KafkaOperations}.
* @param commitRecovered true to process in a transaction.
* @since 2.3
* @deprecated in favor of
* {@link #DefaultAfterRollbackProcessor(BiConsumer, BackOff, KafkaOperations, boolean)}.
* @see #isProcessInTransaction()
* @see #process(List, Consumer, Exception, boolean, ContainerProperties.EOSMode)
* @see #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)
*/
@Deprecated
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1615,12 +1615,14 @@ private void batchAfterRollback(final ConsumerRecords<K, V> records,
RuntimeException rollbackException = decorateException(e);
try {
if (recordList == null) {
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, rollbackException,
false, this.eosMode);
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer,
rollbackException, false, this.eosMode);
}
else {
afterRollbackProcessorToUse.process(recordList, this.consumer, rollbackException, false,
this.eosMode);
afterRollbackProcessorToUse.process(recordList, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer,
rollbackException, false, this.eosMode);
}
}
catch (KafkaException ke) {
Expand Down Expand Up @@ -1891,15 +1893,17 @@ private void recordAfterRollback(Iterator<ConsumerRecord<K, V>> iterator, final

@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
afterRollbackProcessorToUse.process(unprocessed, ListenerConsumer.this.consumer, e, true,
afterRollbackProcessorToUse.process(unprocessed, ListenerConsumer.this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer, e, true,
ListenerConsumer.this.eosMode);
}

});
}
else {
try {
afterRollbackProcessorToUse.process(unprocessed, this.consumer, e, true, this.eosMode);
afterRollbackProcessorToUse.process(unprocessed, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer, e, true, this.eosMode);
}
catch (KafkaException ke) {
ke.selfLog("AfterRollbackProcessor threw an exception", this.logger);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,18 +74,18 @@ void testClassifier() {
@SuppressWarnings("unchecked")
Consumer<String, String> consumer = mock(Consumer.class);
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
processor.process(records, consumer, illegalState, true, EOSMode.ALPHA);
processor.process(records, consumer, new DeserializationException("intended", null, false, illegalState), true,
EOSMode.ALPHA);
processor.process(records, consumer, null, illegalState, true, EOSMode.ALPHA);
processor.process(records, consumer, null, new DeserializationException("intended", null, false, illegalState),
true, EOSMode.ALPHA);
verify(template).sendOffsetsToTransaction(anyMap());
verify(template, never()).sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class));
assertThat(recovered.get()).isSameAs(record1);
processor.addNotRetryableExceptions(IllegalStateException.class);
recovered.set(null);
recovererShouldFail.set(true);
processor.process(records, consumer, illegalState, true, EOSMode.ALPHA);
processor.process(records, consumer, null, illegalState, true, EOSMode.ALPHA);
verify(template, times(1)).sendOffsetsToTransaction(anyMap()); // recovery failed
processor.process(records, consumer, illegalState, true, EOSMode.BETA);
processor.process(records, consumer, null, illegalState, true, EOSMode.BETA);
verify(template, times(1)).sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class));
assertThat(recovered.get()).isSameAs(record1);
InOrder inOrder = inOrder(consumer);
Expand Down Expand Up @@ -120,12 +120,12 @@ void testBatchBackOff() {
@SuppressWarnings("unchecked")
Consumer<String, String> consumer = mock(Consumer.class);
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
processor.process(records, consumer, illegalState, false, EOSMode.BETA);
processor.process(records, consumer, illegalState, false, EOSMode.BETA);
processor.process(records, consumer, null, illegalState, false, EOSMode.BETA);
processor.process(records, consumer, null, illegalState, false, EOSMode.BETA);
verify(backOff, times(2)).start();
verify(execution.get(), times(2)).nextBackOff();
processor.clearThreadState();
processor.process(records, consumer, illegalState, false, EOSMode.BETA);
processor.process(records, consumer, null, illegalState, false, EOSMode.BETA);
verify(backOff, times(3)).start();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -810,7 +810,8 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
verify(afterRollbackProcessor, times(4)).isProcessInTransaction();
ArgumentCaptor<Exception> captor = ArgumentCaptor.forClass(Exception.class);
verify(afterRollbackProcessor, never()).process(any(), any(), captor.capture(), anyBoolean());
verify(afterRollbackProcessor, times(4)).process(any(), any(), captor.capture(), anyBoolean(), any());
verify(afterRollbackProcessor, never()).process(any(), any(), captor.capture(), anyBoolean(), any());
verify(afterRollbackProcessor, times(4)).process(any(), any(), any(), captor.capture(), anyBoolean(), any());
assertThat(captor.getValue()).isInstanceOf(ListenerExecutionFailedException.class)
.extracting(ex -> ((ListenerExecutionFailedException) ex).getGroupId())
.isEqualTo("groupInARBP");
Expand Down Expand Up @@ -943,7 +944,7 @@ void testNoAfterRollbackWhenFenced() throws Exception {
inOrder.verifyNoMoreInteractions();
assertThat(deliveryCount.get()).isEqualTo(1);

verify(arp, never()).process(any(), any(), any(), anyBoolean(), any());
verify(arp, never()).process(any(), any(), any(), any(), anyBoolean(), any());

assertThat(KafkaTestUtils.getPropertyValue(container,
"listenerConsumer.transactionTemplate.timeout", Integer.class))
Expand Down

0 comments on commit f736bd9

Please sign in to comment.