Skip to content

Commit

Permalink
GH-1118: Add RecordInterceptor
Browse files Browse the repository at this point in the history
Resolves #1118

**cherry-pick to 2.2.x**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java
#	spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
#	src/reference/asciidoc/whats-new.adoc
  • Loading branch information
garyrussell authored and artembilan committed Jun 11, 2019
1 parent d80ade5 commit e487ece
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.GenericErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
Expand Down Expand Up @@ -95,6 +96,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe

private ReplyHeadersConfigurer replyHeadersConfigurer;

private RecordInterceptor<K, V> recordInterceptor;

/**
* Specify a {@link ConsumerFactory} to use.
* @param consumerFactory The consumer factory.
Expand Down Expand Up @@ -266,6 +269,16 @@ public ContainerProperties getContainerProperties() {
return this.containerProperties;
}

/**
* Set an interceptor to be called before calling the listener.
* Does not apply to batch listeners.
* @param recordInterceptor the interceptor.
* @since 2.2.7
*/
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
this.recordInterceptor = recordInterceptor;
}

@Override
public void afterPropertiesSet() {
if (this.errorHandler != null) {
Expand Down Expand Up @@ -363,6 +376,7 @@ protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
else if (this.autoStartup != null) {
instance.setAutoStartup(this.autoStartup);
}
instance.setRecordInterceptor(this.recordInterceptor);
if (this.phase != null) {
instance.setPhase(this.phase);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public abstract class AbstractMessageListenerContainer<K, V>
private AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor =
new DefaultAfterRollbackProcessor<>();

private RecordInterceptor<K, V> recordInterceptor;

private volatile boolean running = false;

private volatile boolean paused;
Expand Down Expand Up @@ -261,6 +263,20 @@ public String getListenerId() {
return this.beanName; // the container factory sets the bean name to the id attribute
}

protected RecordInterceptor<K, V> getRecordInterceptor() {
return this.recordInterceptor;
}

/**
* Set an interceptor to be called before calling the listener.
* Does not apply to batch listeners.
* @param recordInterceptor the interceptor.
* @since 2.2.7
*/
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
this.recordInterceptor = recordInterceptor;
}

@Override
public void setupMessageListener(Object messageListener) {
this.containerProperties.setMessageListener(messageListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ protected void doStart() {
container.setClientIdSuffix("-" + i);
container.setGenericErrorHandler(getGenericErrorHandler());
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
container.setRecordInterceptor(getRecordInterceptor());
container.setEmergencyStop(() -> {
stop(() -> {
// NOSONAR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final boolean checkNullValueForExceptions;

private final RecordInterceptor<K, V> recordInterceptor = getRecordInterceptor();

private Map<TopicPartition, OffsetMetadata> definedPartitions;

private volatile Collection<TopicPartition> assignedPartitions;
Expand Down Expand Up @@ -1257,26 +1259,37 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record,
ackCurrent(record, producer);
}

private void doInvokeOnMessage(final ConsumerRecord<K, V> record) {
switch (this.listenerType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null, this.consumer);
break;
case CONSUMER_AWARE:
this.listener.onMessage(record, this.consumer);
break;
case ACKNOWLEDGING:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null);
break;
case SIMPLE:
this.listener.onMessage(record);
break;
private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
ConsumerRecord<K, V> record = recordArg;
if (this.recordInterceptor != null) {
record = this.recordInterceptor.intercept(record);
}
if (record == null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("RecordInterceptor returned null, skipping: " + recordArg);
}
}
else {
switch (this.listenerType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null, this.consumer);
break;
case CONSUMER_AWARE:
this.listener.onMessage(record, this.consumer);
break;
case ACKNOWLEDGING:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null);
break;
case SIMPLE:
this.listener.onMessage(record);
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.lang.Nullable;

/**
* An interceptor for {@link ConsumerRecord} invoked by the listener
* container before invoking the listener.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
* @since 2.2.7
*
*/
@FunctionalInterface
public interface RecordInterceptor<K, V> {

/**
* Perform some action on the record or return a different one.
* If null is returned the record will be skipped.
* @param record the record.
* @return the record or null.
*/
@Nullable
ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);

}
Original file line number Diff line number Diff line change
Expand Up @@ -740,14 +740,19 @@ public void testKeyConversion() throws Exception {
this.bytesKeyTemplate.send("annotated36", "foo".getBytes(), "bar");
assertThat(this.listener.keyLatch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.convertedKey).isEqualTo("foo");
assertThat(this.config.intercepted).isTrue();
}

@Configuration
@EnableKafka
@EnableTransactionManagement(proxyTargetClass = true)
public static class Config implements KafkaListenerConfigurer {

private final CountDownLatch spyLatch = new CountDownLatch(2);
final CountDownLatch spyLatch = new CountDownLatch(2);

volatile Throwable globalErrorThrowable;

volatile boolean intercepted;

@Bean
public static PropertySourcesPlaceholderConfigurer ppc() {
Expand All @@ -770,8 +775,6 @@ public ChainedKafkaTransactionManager<Integer, String> cktm() {
return new ChainedKafkaTransactionManager<>(ktm(), transactionManager());
}

private Throwable globalErrorThrowable;

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
Expand Down Expand Up @@ -857,6 +860,10 @@ public KafkaListenerContainerFactory<?> bytesStringListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<byte[], String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(bytesStringConsumerFactory());
factory.setRecordInterceptor(record -> {
this.intercepted = true;
return record;
});
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,13 @@ public void testAutoCommit() throws Exception {
ContainerProperties containerProps = new ContainerProperties(topic1);
containerProps.setLogContainerConfig(true);

final CountDownLatch latch = new CountDownLatch(4);
final CountDownLatch latch = new CountDownLatch(3);
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
final List<String> payloads = new ArrayList<>();
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
listenerThreadNames.add(Thread.currentThread().getName());
payloads.add(message.value());
latch.countDown();
});

Expand All @@ -132,6 +134,11 @@ public void testAutoCommit() throws Exception {
stopLatch.countDown();
}
});
CountDownLatch intercepted = new CountDownLatch(4);
container.setRecordInterceptor(record -> {
intercepted.countDown();
return record.value().equals("baz") ? null : record;
});
container.start();

ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Expand All @@ -146,6 +153,7 @@ public void testAutoCommit() throws Exception {
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertThat(intercepted.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
for (String threadName : listenerThreadNames) {
assertThat(threadName).contains("-C-");
Expand All @@ -161,6 +169,7 @@ public void testAutoCommit() throws Exception {
Set<KafkaMessageListenerContainer<Integer, String>> children = new HashSet<>(containers);
container.stop();
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(payloads).containsExactlyInAnyOrder("foo", "bar", "qux");
events.forEach(e -> {
assertThat(e.getContainer(MessageListenerContainer.class)).isSameAs(container);
if (e instanceof ContainerStoppedEvent) {
Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,10 @@ Two `MessageListenerContainer` implementations are provided:
The `KafkaMessageListenerContainer` receives all message from all topics or partitions on a single thread.
The `ConcurrentMessageListenerContainer` delegates to one or more `KafkaMessageListenerContainer` instances to provide multi-threaded consumption.

Starting with version 2.1.7, you can add a `RecordInterceptor` to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.
If the interceptor returns null, the listener is not called.
The interceptor is not invoked when the listener is a <<batch-listners, batch listener>>.

[[kafka-container]]
====== Using `KafkaMessageListenerContainer`

Expand Down

0 comments on commit e487ece

Please sign in to comment.