Skip to content

Commit

Permalink
GH-731: Replying template improvements
Browse files Browse the repository at this point in the history
Resolves #731

- Auto detect reply topic/partition
- Improve docs to explain how to use a shared reply topic
- Add boolean to reduce log level for unexpected responses

* Polishing - PR Comments
  • Loading branch information
garyrussell authored and artembilan committed Jul 10, 2018
1 parent 62a6f87 commit 82322d4
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 12 deletions.
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.requestreply;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Collection;
import java.util.Iterator;
Expand All @@ -29,6 +30,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.springframework.beans.factory.DisposableBean;
Expand All @@ -38,6 +40,7 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
Expand Down Expand Up @@ -66,6 +69,10 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen

private final ConcurrentMap<CorrelationKey, RequestReplyFuture<K, V, R>> futures = new ConcurrentHashMap<>();

private final byte[] replyTopic;

private final byte[] replyPartition;

private TaskScheduler scheduler = new ThreadPoolTaskScheduler();

private int phase;
Expand All @@ -74,7 +81,9 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen

private long replyTimeout = DEFAULT_REPLY_TIMEOUT;

private volatile boolean schedulerSet;
private boolean schedulerSet;

private boolean sharedReplyTopic;

private volatile boolean running;

Expand All @@ -89,6 +98,28 @@ public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory,
Assert.notNull(replyContainer, "'replyContainer' cannot be null");
this.replyContainer = replyContainer;
this.replyContainer.setupMessageListener(this);
ContainerProperties properties = this.replyContainer.getContainerProperties();
String replyTopic = null;
byte[] replyPartition = null;
if (properties.getTopics() != null && properties.getTopics().length == 1) {
replyTopic = properties.getTopics()[0];
}
else if (properties.getTopicPartitions() != null && properties.getTopicPartitions().length == 1) {
replyTopic = properties.getTopicPartitions()[0].topic();
ByteBuffer buffer = ByteBuffer.allocate(4);
buffer.putInt(properties.getTopicPartitions()[0].partition());
replyPartition = buffer.array();
}
if (replyTopic == null) {
this.replyTopic = null;
this.replyPartition = null;
this.logger.debug("Could not determine container's reply topic/partition; senders must populate "
+ "at least the " + KafkaHeaders.REPLY_PARTITION + " header");
}
else {
this.replyTopic = replyTopic.getBytes(StandardCharsets.UTF_8);
this.replyPartition = replyPartition;
}
}

public void setTaskScheduler(TaskScheduler scheduler) {
Expand Down Expand Up @@ -133,6 +164,16 @@ public Collection<TopicPartition> getAssignedReplyTopicPartitions() {
return this.replyContainer.getAssignedPartitions();
}

/**
* Set to true when multiple templates are using the same topic for replies.
* This simply changes logs for unexpected replies to debug instead of error.
* @param sharedReplyTopic true if using a shared topic.
* @since 2.2
*/
public void setSharedReplyTopic(boolean sharedReplyTopic) {
this.sharedReplyTopic = sharedReplyTopic;
}

@Override
public void afterPropertiesSet() throws Exception {
if (!this.schedulerSet) {
Expand Down Expand Up @@ -174,7 +215,21 @@ public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
Assert.state(this.running, "Template has not been start()ed"); // NOSONAR (sync)
CorrelationKey correlationId = createCorrelationId(record);
Assert.notNull(correlationId, "the created 'correlationId' cannot be null");
record.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID, correlationId.getCorrelationId()));
boolean hasReplyTopic = false;
Headers headers = record.headers();
Iterator<Header> iterator = headers.iterator();
while (iterator.hasNext() && !hasReplyTopic) {
if (iterator.next().key().equals(KafkaHeaders.REPLY_TOPIC)) {
hasReplyTopic = true;
}
}
if (!hasReplyTopic && this.replyTopic != null) {
headers.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, this.replyTopic));
if (this.replyPartition != null) {
headers.add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, this.replyPartition));
}
}
headers.add(new RecordHeader(KafkaHeaders.CORRELATION_ID, correlationId.getCorrelationId()));
if (this.logger.isDebugEnabled()) {
this.logger.debug("Sending: " + record + " with correlationId: " + correlationId);
}
Expand Down Expand Up @@ -240,8 +295,14 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
else {
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
this.logger.error("No pending reply: " + record + " with correlationId: "
+ correlationId + ", perhaps timed out");
if (this.sharedReplyTopic) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(missingCorrelationLogMessage(record, correlationId));
}
}
else if (this.logger.isErrorEnabled()) {
this.logger.error(missingCorrelationLogMessage(record, correlationId));
}
}
else {
if (this.logger.isDebugEnabled()) {
Expand All @@ -253,6 +314,11 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
});
}

private String missingCorrelationLogMessage(ConsumerRecord<K, R> record, CorrelationKey correlationId) {
return "No pending reply: " + record + " with correlationId: "
+ correlationId + ", perhaps timed out, or using a shared reply topic";
}

/**
* A listenable future for requests/replies.
*
Expand Down
Expand Up @@ -52,6 +52,7 @@
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
Expand Down Expand Up @@ -99,7 +100,6 @@ public void testGood() throws Exception {
try {
template.setReplyTimeout(30_000);
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, "foo");
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, A_REPLY.getBytes()));
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
Expand Down Expand Up @@ -127,19 +127,37 @@ public void testMultiListenerMessageReturn() throws Exception {
}
}

@Test
public void testGoodDefaultReplyHeaders() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(
new TopicPartitionInitialOffset(A_REPLY, 3));
try {
template.setReplyTimeout(30_000);
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(A_REQUEST, "bar");
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
assertThat(consumerRecord.value()).isEqualTo("BAR");
assertThat(consumerRecord.partition()).isEqualTo(3);
}
finally {
template.stop();
}
}

@Test
public void testGoodSamePartition() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(A_REPLY);
try {
template.setReplyTimeout(30_000);
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, 2, null, "foo");
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, 2, null, "baz");
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, A_REPLY.getBytes()));
record.headers()
.add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, new byte[] { 0, 0, 0, 2 }));
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
assertThat(consumerRecord.value()).isEqualTo("FOO");
assertThat(consumerRecord.value()).isEqualTo("BAZ");
assertThat(consumerRecord.partition()).isEqualTo(2);
}
finally {
Expand All @@ -152,7 +170,7 @@ public void testTimeout() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(A_REPLY);
try {
template.setReplyTimeout(1);
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, "foo");
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, "fiz");
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, A_REPLY.getBytes()));
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
Expand Down Expand Up @@ -214,13 +232,33 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
containerProperties);
container.setBeanName(this.testName.getMethodName());
ReplyingKafkaTemplate<Integer, String, String> template = new ReplyingKafkaTemplate<>(this.config.pf(), container);
template.setSharedReplyTopic(true);
template.start();
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(template.getAssignedReplyTopicPartitions()).hasSize(5);
assertThat(template.getAssignedReplyTopicPartitions().iterator().next().topic()).isEqualTo(topic);
return template;
}

public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartitionInitialOffset topic)
throws Exception {

ContainerProperties containerProperties = new ContainerProperties(topic);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName.getMethodName(), "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
containerProperties);
container.setBeanName(this.testName.getMethodName());
ReplyingKafkaTemplate<Integer, String, String> template = new ReplyingKafkaTemplate<>(this.config.pf(), container);
template.setSharedReplyTopic(true);
template.start();
assertThat(template.getAssignedReplyTopicPartitions()).hasSize(1);
assertThat(template.getAssignedReplyTopicPartitions().iterator().next().topic()).isEqualTo(topic.topic());
return template;
}

@Configuration
@EnableKafka
public static class Config {
Expand Down
23 changes: 19 additions & 4 deletions src/reference/asciidoc/kafka.adoc
Expand Up @@ -371,7 +371,6 @@ public class KRequestingApplication {
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get();
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
Expand All @@ -384,12 +383,14 @@ public class KRequestingApplication {
public ReplyingKafkaTemplate<String, String, String> kafkaTemplate(
ProducerFactory<String, String> pf,
KafkaMessageListenerContainer<String, String> replyContainer) {
return new ReplyingKafkaTemplate<>(pf, replyContainer);
}
@Bean
public KafkaMessageListenerContainer<String, String> replyContainer(
ConsumerFactory<String, String> cf) {
ContainerProperties containerProperties = new ContainerProperties("kReplies");
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
Expand All @@ -407,7 +408,7 @@ public class KRequestingApplication {
}
----

In addition to the reply topic header set by user code, the template sets a header `KafkaHeaders.CORRELATION_ID` which must be echoed back by the server side.
The template sets a header `KafkaHeaders.CORRELATION_ID` which must be echoed back by the server side.

In this case, simple `@KafkaListener` application responds:

Expand Down Expand Up @@ -444,9 +445,23 @@ public class KReplyingApplication {

The `@KafkaListener` infrastructure echoes the correlation id and determines the reply topic.

See <<annotation-send-to>> for more information about sending replies; in this case we use the default header `KafKaHeaders.REPLY_TOPIC` to indicate which topic the reply goes to.
See <<annotation-send-to>> for more information about sending replies; the template uses the default header `KafKaHeaders.REPLY_TOPIC` to indicate which topic the reply goes to.

Starting with version 2.2, the template will attempt to detect the reply topic/partition from the configured reply container.
If the container is configured to listen to a single topic or a single `TopicPartitionInitialOffset`, it will be used to set the reply headers.
If the container is configured otherwise, the user must set up the reply header(s); in this case, an INFO log is written during initialization.

[source, java]
----
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
----

When configuring with a single reply `TopicPartitionInitialOffset`, you can use the same reply topic for multiple templates, as long as each instance listens on a different partition.
When configuring with a single reply topic, each instance must use a different `group.id` - in this case, all instances will receive each reply, but only the instance that sent the request will find the correlation id.
This may be useful for auto-scaling, but with the overhead of additional network traffic and the small cost of discarding each unwanted reply.
When using this setting, it is recommended that you set the template's `sharedReplyTopic` to true, which will reduce the logging level of unexpected replies to DEBUG instead of the default ERROR.

IMPORTANT: If you have multiple client instances, each will need a dedicated reply topic for each instance.
IMPORTANT: If you have multiple client instances, and you don't configure them as discussed in the paragraphe above, each instance will need a dedicated reply topic.
An alternative is to set the `KafkaHeaders.REPLY_PARTITION` and use a dedicated partition for each instance; the `Header` contains a 4 byte int (Big-endian).
The server must use this header to route the reply to the correct topic (`@KafkaListener` does this).
In this case, though, the reply container must not use Kafka's group management feature and must be configured to listen on a fixed partition (using a `TopicPartitionInitialOffset` in its `ContainerProperties` constructor).
Expand Down

0 comments on commit 82322d4

Please sign in to comment.