Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
GH-212: Add ConsumerSeekAware impl to Inbounds (#213)
Browse files Browse the repository at this point in the history
* GH-212: Add ConsumerSeekAware impl to Inbounds

Fixes #212

**Cherry-pick to 3.0.x**

* GH-212: Add ConsumerSeekAware impl to Inbounds

Fixes #212

* Introduce a new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK`
header to be populated to messages for sending to the channel
* Populate that header from the `KafkaInboundGateway` and
`KafkaMessageDrivenChannelAdapter` into the message from the
`seekCallBack` property if `ListenerContainer` is single-threaded or
from the `ThreadLocal<ConsumerSeekAware.ConsumerSeekCallback>` otherwise;
and only if newly introduced `setAdditionalHeaders` is `true`
* Populate `seekCallBack` property or `ThreadLocal<?>` from the
`registerSeekCallback()` implementation from the internal listeners
* Add `setOnPartitionsAssignedSeekCallback(BiConsumer)` and
`setOnIdleSeekCallback(BiConsumer)` options to react for the appropriate
event from the underlying container and perform appropriate seek management
* Add new options to the DSL classes and cover them with tests, including
check for new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK` header

**Cherry-pick to 3.0.x**

* Address PR comments: remove unnecessary API

* *Polishing `setOnPartitionsAssignedSeekCallback()` JavaDocs
*Close producers in the `KafkaProducerMessageHandlerTests`
  • Loading branch information
artembilan authored and garyrussell committed Sep 7, 2018
1 parent 120810e commit cecb661
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

import java.util.Collections;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.apache.kafka.common.TopicPartition;

import org.springframework.integration.dsl.ComponentsRegistration;
import org.springframework.integration.dsl.MessagingGatewaySpec;
import org.springframework.integration.kafka.inbound.KafkaInboundGateway;
import org.springframework.integration.support.ObjectStringMapBuilder;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
Expand All @@ -41,6 +45,7 @@
* @param <S> the target {@link KafkaInboundGatewaySpec} implementation type.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 3.0.2
*/
Expand Down Expand Up @@ -91,6 +96,20 @@ public S recoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
return _this();
}

/**
* Specify a {@link BiConsumer} for seeks management during
* {@link ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)}
* call from the {@link org.springframework.kafka.listener.KafkaMessageListenerContainer}.
* @param onPartitionsAssignedCallback the {@link BiConsumer} to use
* @return the spec
* @since 3.0.4
*/
public S onPartitionsAssignedSeekCallback(
BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {
this.target.setOnPartitionsAssignedSeekCallback(onPartitionsAssignedCallback);
return _this();
}

@Override
public Map<Object, String> getComponentsToRegister() {
return Collections.singletonMap(this.container, getId() == null ? null : getId() + ".container");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@

import java.util.Collections;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.apache.kafka.common.TopicPartition;

import org.springframework.integration.dsl.ComponentsRegistration;
import org.springframework.integration.dsl.MessageProducerSpec;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
Expand Down Expand Up @@ -156,6 +160,20 @@ public S filterInRetry(boolean filterInRetry) {
return _this();
}

/**
* Specify a {@link BiConsumer} for seeks management during
* {@link ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)}
* call from the {@link org.springframework.kafka.listener.KafkaMessageListenerContainer}.
* @param onPartitionsAssignedCallback the {@link BiConsumer} to use
* @return the spec
* @since 3.0.4
*/
public S onPartitionsAssignedSeekCallback(
BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {
this.target.setOnPartitionsAssignedSeekCallback(onPartitionsAssignedCallback);
return _this();
}

@Override
public Map<Object, String> getComponentsToRegister() {
return Collections.singletonMap(this.container, getId() == null ? null : getId() + ".container");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

package org.springframework.integration.kafka.inbound;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

import org.springframework.core.AttributeAccessor;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
Expand All @@ -32,6 +35,7 @@
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
Expand All @@ -58,6 +62,7 @@
* @param <R> the reply value type.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 3.0.2
*
Expand All @@ -76,6 +81,8 @@ public class KafkaInboundGateway<K, V, R> extends MessagingGatewaySupport implem

private RecoveryCallback<? extends Object> recoveryCallback;

private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;

/**
* Construct an instance with the provided container.
* @param messageListenerContainer the container.
Expand Down Expand Up @@ -133,6 +140,21 @@ public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallb
this.recoveryCallback = recoveryCallback;
}

/**
* Specify a {@link BiConsumer} for seeks management during
* {@link ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)}
* call from the {@link org.springframework.kafka.listener.KafkaMessageListenerContainer}.
* This is called from the internal
* {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter} implementation.
* @param onPartitionsAssignedCallback the {@link BiConsumer} to use
* @since 3.0.4
* @see ConsumerSeekAware#onPartitionsAssigned
*/
public void setOnPartitionsAssignedSeekCallback(
BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {
this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
}

@Override
protected void onInit() throws Exception {
super.onInit();
Expand Down Expand Up @@ -212,6 +234,13 @@ private class IntegrationRecordMessageListener extends RecordMessagingMessageLis
super(null, null);
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
if (KafkaInboundGateway.this.onPartitionsAssignedSeekCallback != null) {
KafkaInboundGateway.this.onPartitionsAssignedSeekCallback.accept(assignments, callback);
}
}

@Override
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
Message<?> message = null;
Expand Down Expand Up @@ -254,7 +283,7 @@ private Message<?> addDeliveryAttemptHeader(Message<?> message) {
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
if (message.getHeaders() instanceof KafkaMessageHeaders) {
((KafkaMessageHeaders) message.getHeaders()).getRawHeaders()
.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
}
else {
messageToReturn = MessageBuilder.fromMessage(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package org.springframework.integration.kafka.inbound;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

import org.springframework.core.AttributeAccessor;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
Expand All @@ -33,6 +36,7 @@
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.FilteringBatchMessageListenerAdapter;
Expand Down Expand Up @@ -90,6 +94,8 @@ public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSuppo

private boolean filterInRetry;

private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;

/**
* Construct an instance with mode {@link ListenerMode#record}.
* @param messageListenerContainer the container.
Expand Down Expand Up @@ -225,6 +231,21 @@ public void setPayloadType(Class<?> payloadType) {
this.batchListener.setFallbackType(payloadType);
}

/**
* Specify a {@link BiConsumer} for seeks management during
* {@link ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)}
* call from the {@link org.springframework.kafka.listener.KafkaMessageListenerContainer}.
* This is called from the internal
* {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter} implementation.
* @param onPartitionsAssignedCallback the {@link BiConsumer} to use
* @since 3.0.4
* @see ConsumerSeekAware#onPartitionsAssigned
*/
public void setOnPartitionsAssignedSeekCallback(
BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {
this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
}

@Override
public String getComponentType() {
return "kafka:message-driven-channel-adapter";
Expand Down Expand Up @@ -342,6 +363,23 @@ protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
}
}

private void sendMessageIfAny(Message<?> message, Object kafkaConsumedObject) {
if (message != null) {
try {
sendMessage(message);
}
finally {
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate == null) {
attributesHolder.remove();
}
}
}
else {
KafkaMessageDrivenChannelAdapter.this.logger.debug("Converter returned a null message for: "
+ kafkaConsumedObject);
}
}

/**
* The listener mode for the container, record or batch.
* @since 1.2
Expand All @@ -368,6 +406,13 @@ private class IntegrationRecordMessageListener extends RecordMessagingMessageLis
super(null, null);
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
if (KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback != null) {
KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback.accept(assignments, callback);
}
}

@Override
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
Message<?> message = null;
Expand All @@ -382,20 +427,8 @@ public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment
RuntimeException exception = new ConversionException("Failed to convert to message for: " + record, e);
sendErrorMessageIfNecessary(null, exception);
}
if (message != null) {
try {
sendMessage(message);
}
finally {
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate == null) {
attributesHolder.remove();
}
}
}
else {
KafkaMessageDrivenChannelAdapter.this.logger.debug("Converter returned a null message for: "
+ record);
}

sendMessageIfAny(message, record);
}

private Message<?> addDeliveryAttemptHeader(Message<?> message) {
Expand All @@ -404,7 +437,7 @@ private Message<?> addDeliveryAttemptHeader(Message<?> message) {
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
if (message.getHeaders() instanceof KafkaMessageHeaders) {
((KafkaMessageHeaders) message.getHeaders()).getRawHeaders()
.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
}
else {
messageToReturn = MessageBuilder.fromMessage(message)
Expand Down Expand Up @@ -444,8 +477,15 @@ private class IntegrationBatchMessageListener extends BatchMessagingMessageListe
}

@Override
public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
if (KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback != null) {
KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback.accept(assignments, callback);
}
}

@Override
public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {

Message<?> message = null;
try {
Expand All @@ -458,20 +498,8 @@ public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowl
getMessagingTemplate().send(getErrorChannel(), new ErrorMessage(exception));
}
}
if (message != null) {
try {
sendMessage(message);
}
finally {
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate == null) {
attributesHolder.remove();
}
}
}
else {
KafkaMessageDrivenChannelAdapter.this.logger.debug("Converter returned a null message for: "
+ records);
}

sendMessageIfAny(message, records);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ public void testKafkaAdapters() throws Exception {
this.kafkaTemplateTopic1.send(TEST_TOPIC3, "foo");
assertThat(this.config.sourceFlowLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.fromSource).isEqualTo("foo");

assertThat(this.config.onPartitionsAssignedCalledLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
Expand All @@ -226,6 +228,8 @@ public static class ContextConfiguration {

private final CountDownLatch replyContainerLatch = new CountDownLatch(1);

private final CountDownLatch onPartitionsAssignedCalledLatch = new CountDownLatch(1);

private Object fromSource;

@Bean
Expand All @@ -247,11 +251,14 @@ public IntegrationFlow topic1ListenerFromKafkaFlow() {
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
.configureListenerContainer(c ->
c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
.idleEventInterval(100L)
.id("topic1ListenerContainer"))
.recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.filterInRetry(true)
.onPartitionsAssignedSeekCallback((map, callback) ->
ContextConfiguration.this.onPartitionsAssignedCalledLatch.countDown()))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
f -> f.throwExceptionOnRejection(true))
Expand Down
Loading

0 comments on commit cecb661

Please sign in to comment.