Skip to content

Commit

Permalink
Kafka Outbound Endpoints - Add futuresChannel
Browse files Browse the repository at this point in the history
Currently, the only way to block is to set `sync=true` which waits for the future.
The problem with this is it only supports one-at-a-time publication.

Add an option to send the send futures to a channel, allowing the application to
send multiple records and then wait on the futures later.

Also fix long-running test.

* Add docs.
  • Loading branch information
garyrussell committed Oct 16, 2020
1 parent 4027adc commit 9c6000e
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,28 @@ public S sendFailureChannel(String sendFailureChannel) {
return _this();
}

/**
* Set the channel to which send futures are sent.
* @param futuresChannel the channel.
* @return the spec.
* @since 5.4
*/
public S futuresChannel(MessageChannel futuresChannel) {
this.target.setFuturesChannel(futuresChannel);
return _this();
}

/**
* Set the channel to which send futures are sent.
* @param futuresChannel the channel name.
* @return the spec.
* @since 5.4
*/
public S futuresChannel(String futuresChannel) {
this.target.setFuturesChannelName(futuresChannel);
return _this();
}

/**
* A {@link KafkaTemplate}-based {@link KafkaProducerMessageHandlerSpec} extension.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes

private String sendSuccessChannelName;

private MessageChannel futuresChannel;

private String futuresChannelName;

private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();

private Type replyPayloadType = Object.class;
Expand Down Expand Up @@ -338,6 +342,24 @@ public void setSendSuccessChannelName(String sendSuccessChannelName) {
this.sendSuccessChannelName = sendSuccessChannelName;
}

/**
* Set the futures channel.
* @param futuresChannel the futures channel.
* @since 5.4
*/
public void setFuturesChannel(MessageChannel futuresChannel) {
this.futuresChannel = futuresChannel;
}

/**
* Set the futures channel name.
* @param futuresChannelName the futures channel name.
* @since 5.4
*/
public void setFuturesChannelName(String futuresChannelName) {
this.futuresChannelName = futuresChannelName;
}

/**
* Set the error message strategy implementation to use when sending error messages after
* send failures. Cannot be null.
Expand Down Expand Up @@ -409,6 +431,17 @@ else if (this.sendSuccessChannelName != null) {
return null;
}

protected MessageChannel getFuturesChannel() {
if (this.futuresChannel != null) {
return this.futuresChannel;
}
else if (this.futuresChannelName != null) {
this.futuresChannel = getChannelResolver().resolveDestination(this.futuresChannelName);
return this.futuresChannel;
}
return null;
}

@Override
protected void doInit() {
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
Expand Down Expand Up @@ -447,6 +480,10 @@ protected Object handleRequestMessage(final Message<?> message) {
producerRecord.headers().remove(KafkaIntegrationHeaders.FLUSH);
}
}
Object futureToken = message.getHeaders().get(KafkaIntegrationHeaders.FUTURE_TOKEN);
if (futureToken != null) {
producerRecord.headers().remove(KafkaIntegrationHeaders.FUTURE_TOKEN);
}
ListenableFuture<SendResult<K, V>> sendFuture;
RequestReplyFuture<K, V, Object> gatewayFuture = null;
if (this.isGateway && (!preBuilt || producerRecord.headers().lastHeader(KafkaHeaders.REPLY_TOPIC) == null)) {
Expand All @@ -464,6 +501,10 @@ protected Object handleRequestMessage(final Message<?> message) {
sendFuture = this.kafkaTemplate.send(producerRecord);
}
}
sendFutureIfRequested(message, sendFuture, futureToken);
if (flush) {
this.kafkaTemplate.flush();
}
try {
processSendResult(message, producerRecord, sendFuture, getSendSuccessChannel());
}
Expand All @@ -474,12 +515,28 @@ protected Object handleRequestMessage(final Message<?> message) {
catch (ExecutionException e) {
throw new MessageHandlingException(message, e.getCause()); // NOSONAR
}
if (flush) {
this.kafkaTemplate.flush();
}
return processReplyFuture(gatewayFuture);
}

private void sendFutureIfRequested(final Message<?> message, ListenableFuture<SendResult<K, V>> sendFuture,
Object futureToken) {

if (futureToken != null) {
MessageChannel futures = getFuturesChannel();
if (futures != null) {
try {
futures.send(getMessageBuilderFactory()
.withPayload(sendFuture)
.setHeader(KafkaIntegrationHeaders.FUTURE_TOKEN, futureToken)
.build());
}
catch (Exception e) {
this.logger.error(e, "Failed to send sendFuture");
}
}
}
}

@SuppressWarnings("unchecked")
private ProducerRecord<K, V> createProducerRecord(final Message<?> message) {
MessageHeaders messageHeaders = message.getHeaders();
Expand Down Expand Up @@ -540,7 +597,7 @@ else if (replyHeader != null) {
replyTopic = getSingleReplyTopic();
}
else {
throw new IllegalStateException("No reply topic header and no default reply topic is can be determined");
throw new IllegalStateException("No reply topic header and no default reply topic can be determined");
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ private KafkaIntegrationHeaders() {
*/
public static final String FLUSH = KafkaHeaders.PREFIX + "flush";

/**
* Set to a token to correlate a send Future.
*/
public static final String FUTURE_TOKEN = KafkaHeaders.PREFIX + "futureToken";

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import static org.mockito.Mockito.mock;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -109,12 +112,16 @@ public void close(Duration timeout) {
@SuppressWarnings("unchecked")
ProducerFactory<Integer, String> pf = mock(ProducerFactory.class);
given(pf.createProducer()).willReturn(mockProducer);
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10);
given(pf.getConfigurationProperties()).willReturn(props);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
KafkaProducerMessageHandler<Integer, String> handler = new KafkaProducerMessageHandler<>(template);
handler.setBeanFactory(mock(BeanFactory.class));
handler.afterPropertiesSet();

handler.setSync(true);
handler.setSendTimeout(10);
handler.setTopicExpression(new LiteralExpression("foo"));

Executors.newSingleThreadExecutor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

Expand All @@ -49,6 +50,7 @@
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.kafka.support.KafkaIntegrationHeaders;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
Expand Down Expand Up @@ -137,6 +139,9 @@ public class KafkaDslTests {
@Autowired
private PollableChannel errorChannel;

@Autowired
private PollableChannel futuresChannel;

@Autowired(required = false)
@Qualifier("topic1ListenerContainer")
private MessageListenerContainer messageListenerContainer;
Expand Down Expand Up @@ -164,6 +169,12 @@ void testKafkaAdapters() throws Exception {

assertThat(TestUtils.getPropertyValue(this.kafkaProducer1, "headerMapper")).isSameAs(this.mapper);

for (int i = 0; i < 200; i++) {
Message<?> future = this.futuresChannel.receive(10000);
assertThat(future).isNotNull();
((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
}

for (int i = 0; i < 100; i++) {
Message<?> receive = this.listeningFromKafkaResults1.receive(20000);
assertThat(receive).isNotNull();
Expand Down Expand Up @@ -327,10 +338,16 @@ public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(props);
}

@Bean
public PollableChannel futuresChannel() {
return new QueueChannel();
}

@Bean
public IntegrationFlow sendToKafkaFlow() {
return f -> f
.<String>split(p -> Stream.generate(() -> p).limit(101).iterator(), null)
.enrichHeaders(h -> h.header(KafkaIntegrationHeaders.FUTURE_TOKEN, "foo"))
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
Expand All @@ -354,6 +371,7 @@ public DefaultKafkaHeaderMapper mapper() {

return Kafka
.outboundChannelAdapter(producerFactory)
.futuresChannel("futuresChannel")
.sync(true)
.messageKey(m -> m
.getHeaders()
Expand Down
78 changes: 78 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -847,3 +847,81 @@ public IntegrationFlow flow() {
====

When an integration flow starts with an interface, the proxy that is created has the name of the flow bean, appended with ".gateway" so this bean name can be used a a `@Qualifier` if needed.

[[read-process-write]]
=== Performance Considerations for read/process/write Scenarios

Many applications consume from a topic, perform some processing and write to another topic.
In most, cases, if the write fails, the application would want to throw an exception so the incoming request can be retried and/or sent to a dead letter topic.
This functionality is supported by the underlying message listener container, together with a suitably configured error handler.
However, in order to support this, we need to block the listener thread until the success (or failure) of the write operation so that any exceptions can be thrown to the container.
When consuming single records, this is achieved by setting the `sync` property on the outbound adapter.
However, when consuming batches, using `sync` causes a significant performance degradation because the application would wait for the result of each send before sending the next message.
Starting with version 5.4, you can now perform multiple sends and then wait for the results of those sends afterwards.
This is achieved by adding a `futuresChannel` to the message handler.
To enable the feature add `KafkaIntegrationHeaders.FUTURE_TOKEN` to the outbound messages; this can then be used to correlate a `Future` to a particular sent message.
Here is an example of how you might use this feature:

====
[source, java]
----
@SpringBootApplication
public class FuturesChannelApplication {
public static void main(String[] args) {
SpringApplication.run(FuturesChannelApplication.class, args);
}
@Bean
IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory, ListenerMode.batch, "inTopic"))
.handle(handler)
.get();
}
@Bean
IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
return IntegrationFlows.from(Gate.class)
.enrichHeaders(h -> h
.header(KafkaHeaders.TOPIC, "outTopic")
.headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.futuresChannel("futures"))
.get();
}
@Bean
PollableChannel futures() {
return new QueueChannel();
}
}
@Component
@DependsOn("outbound")
class Handler {
@Autowired
Gate gate;
@Autowired
PollableChannel futures;
public void handle(List<String> input) throws Exception {
System.out.println(input);
input.forEach(str -> this.gate.send(str.toUpperCase()));
for (int i = 0; i < input.size(); i++) {
Message<?> future = this.futures.receive(10000);
((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
}
}
}
interface Gate {
void send(String out);
}
----
====
3 changes: 3 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ See <<./kafka.adoc#kafka,Spring for Apache Kafka Support>> for more information.
The `KafkaProducerMessageHandler` `sendTimeoutExpression` default has changed.
See <<./kafka.adoc#kafka-outbound,Kafka Outbound Channel Adapter>> for more information.

You can now access the `Future<?>` for underlying `send()` operations.
See <<./kafka.adoc#read-process-write>> for more information.

[[x5.4-r2dbc]]
==== R2DBC Channel Adapters

Expand Down

0 comments on commit 9c6000e

Please sign in to comment.