Skip to content

Commit

Permalink
GH-1340: Support returning a collection
Browse files Browse the repository at this point in the history
Resolves #1340

Previously, a reply type of `Collection` always split the collection
into discrete replies.

There is now an option to send the entire collection in the reply.

* Fix cast.

* Fix doc link.

* GH-1340: Fix test
  • Loading branch information
garyrussell committed Dec 23, 2019
1 parent 2fd4ddb commit aec7807
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,13 @@
*/
String[] properties() default {};

/**
* When false and the return type is a {@link Iterable} return the result as the value
* of a single reply record instead of individual records for each element. Default
* true. Ignored if the reply is of type {@code Iterable<Message<?>>}.
* @return false to create a single reply record.
* @since 2.3.5
*/
boolean splitIterables() default true;

}
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
}
resolveKafkaProperties(endpoint, kafkaListener.properties());
endpoint.setSplitIterables(kafkaListener.splitIterables());

KafkaListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>

private Properties consumerProperties;

private boolean splitIterables = true;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
Expand Down Expand Up @@ -432,6 +434,21 @@ public void setConsumerProperties(Properties consumerProperties) {
this.consumerProperties = consumerProperties;
}

@Override
public boolean isSplitIterables() {
return this.splitIterables;
}

/**
* Set to false to disable splitting {@link Iterable} reply values into separate
* records.
* @param splitIterables false to disable; default true.
* @since 2.3.5
*/
public void setSplitIterables(boolean splitIterables) {
this.splitIterables = splitIterables;
}

@Override
public void afterPropertiesSet() {
boolean topicsEmpty = getTopics().isEmpty();
Expand Down Expand Up @@ -470,6 +487,7 @@ private void setupMessageListener(MessageListenerContainer container, MessageCon
if (this.replyHeadersConfigurer != null) {
adapter.setReplyHeadersConfigurer(this.replyHeadersConfigurer);
}
adapter.setSplitIterables(this.splitIterables);
Object messageListener = adapter;
Assert.state(messageListener != null,
() -> "Endpoint [" + this + "] must provide a non null message listener");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,11 @@ default Properties getConsumerProperties() {
*/
void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter);

/**
* When true, {@link Iterable} return results will be split into discrete records.
* @return true to split.
* @since 2.3.5
*/
boolean isSplitIterables();

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,9 @@ public void setupListenerContainer(MessageListenerContainer listenerContainer,
MessageConverter messageConverter) {
}

@Override
public boolean isSplitIterables() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -123,6 +124,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private ReplyHeadersConfigurer replyHeadersConfigurer;

private boolean splitIterables = true;

public MessagingMessageListenerAdapter(Object bean, Method method) {
this.bean = bean;
this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final
Expand Down Expand Up @@ -252,6 +255,25 @@ public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigu
this.replyHeadersConfigurer = replyHeadersConfigurer;
}

/**
* When true, {@link Iterable} return results will be split into discrete records.
* @return true to split.
* @since 2.3.5
*/
protected boolean isSplitIterables() {
return this.splitIterables;
}

/**
* Set to false to disable splitting {@link Iterable} reply values into separate
* records.
* @param splitIterables false to disable; default true.
* @since 2.3.5
*/
public void setSplitIterables(boolean splitIterables) {
this.splitIterables = splitIterables;
}

@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
if (this.bean instanceof ConsumerSeekAware) {
Expand Down Expand Up @@ -406,15 +428,25 @@ else if (result instanceof Message) {
this.replyTemplate.send((Message<?>) result);
}
else {
if (result instanceof Collection) {
((Collection<V>) result).forEach(v -> {
if (v instanceof Message) {
this.replyTemplate.send((Message<?>) v);
}
else {
this.replyTemplate.send(topic, v);
}
});
if (result instanceof Iterable) {
Iterator<?> iterator = ((Iterable<?>) result).iterator();
boolean iterableOfMessages = false;
if (iterator.hasNext()) {
iterableOfMessages = iterator.next() instanceof Message;
}
if (iterableOfMessages || this.splitIterables) {
((Iterable<V>) result).forEach(v -> {
if (v instanceof Message) {
this.replyTemplate.send((Message<?>) v);
}
else {
this.replyTemplate.send(topic, v);
}
});
}
else {
sendSingleResult(result, topic, source);
}
}
else {
sendSingleResult(result, topic, source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@
import org.springframework.kafka.support.converter.ProjectingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
Expand Down Expand Up @@ -153,7 +155,8 @@
"annotated22reply", "annotated23", "annotated23reply", "annotated24", "annotated24reply",
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle" })
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle",
"annotated38", "annotated38reply" })
public class EnableKafkaIntegrationTests {

private static final String DEFAULT_TEST_GROUP_ID = "testAnnot";
Expand Down Expand Up @@ -802,6 +805,41 @@ public void testSeekToLastOnIdle() throws InterruptedException {
assertThat(KafkaTestUtils.getPropertyValue(this.seekOnIdleListener, "callbacks", Map.class)).hasSize(0);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testReplyingBatchListenerReturnCollection() {
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testReplyingBatchListenerReturnCollection");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
ConsumerFactory<Integer, Object> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, Object> consumer = cf.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "annotated38reply");
template.send("annotated38", 0, 0, "FoO");
template.send("annotated38", 0, 0, "BaR");
template.flush();
ConsumerRecords replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
Iterator<ConsumerRecord<?, ?>> iterator = replies.iterator();
Object value = iterator.next().value();
assertThat(value).isInstanceOf(List.class);
List list = (List) value;
assertThat(list).hasSizeGreaterThanOrEqualTo(1);
assertThat(list.get(0)).isEqualTo("FOO");
if (list.size() > 1) {
assertThat(list.get(1)).isEqualTo("BAR");
}
else {
replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
iterator = replies.iterator();
value = iterator.next().value();
list = (List) value;
assertThat(list).hasSize(1);
assertThat(list.get(0)).isEqualTo("BAR");
}
consumer.close();
}

@Configuration
@EnableKafka
@EnableTransactionManagement(proxyTargetClass = true)
Expand Down Expand Up @@ -850,7 +888,7 @@ public ChainedKafkaTransactionManager<Integer, String> cktm() {
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(recordFilter());
factory.setReplyTemplate(partitionZeroReplyingTemplate());
factory.setReplyTemplate(partitionZeroReplyTemplate());
factory.setErrorHandler((ConsumerAwareErrorHandler) (t, d, c) -> {
this.globalErrorThrowable = t;
c.seek(new org.apache.kafka.common.TopicPartition(d.topic(), d.partition()), d.offset());
Expand All @@ -867,7 +905,7 @@ public ChainedKafkaTransactionManager<Integer, String> cktm() {
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(recordFilter());
factory.setReplyTemplate(partitionZeroReplyingTemplate());
factory.setReplyTemplate(partitionZeroReplyTemplate());
factory.setErrorHandler((ConsumerAwareErrorHandler) (t, d, c) -> {
this.globalErrorThrowable = t;
c.seek(new org.apache.kafka.common.TopicPartition(d.topic(), d.partition()), d.offset());
Expand Down Expand Up @@ -987,7 +1025,19 @@ public KafkaListenerContainerFactory<?> batchFactory() {
factory.setBatchListener(true);
factory.setRecordFilterStrategy(recordFilter());
// always send to the same partition so the replies are in order for the test
factory.setReplyTemplate(partitionZeroReplyingTemplate());
factory.setReplyTemplate(partitionZeroReplyTemplate());
return factory;
}

@Bean
public KafkaListenerContainerFactory<?> batchJsonReplyFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setRecordFilterStrategy(recordFilter());
// always send to the same partition so the replies are in order for the test
factory.setReplyTemplate(partitionZeroReplyJsonTemplate());
return factory;
}

Expand Down Expand Up @@ -1017,7 +1067,7 @@ public KafkaListenerContainerFactory<?> batchSpyFactory() {
factory.setBatchListener(true);
factory.setRecordFilterStrategy(recordFilter());
// always send to the same partition so the replies are in order for the test
factory.setReplyTemplate(partitionZeroReplyingTemplate());
factory.setReplyTemplate(partitionZeroReplyTemplate());
factory.setMissingTopicsFatal(false);
return factory;
}
Expand Down Expand Up @@ -1167,6 +1217,13 @@ public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public ProducerFactory<Integer, Object> jsonProducerFactory() {
Map<String, Object> producerConfigs = producerConfigs();
producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(producerConfigs);
}

@Bean
public ProducerFactory<Integer, String> txProducerFactory() {
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerConfigs());
Expand Down Expand Up @@ -1197,7 +1254,7 @@ public KafkaTemplate<byte[], String> bytesKeyTemplate() {
}

@Bean
public KafkaTemplate<Integer, String> partitionZeroReplyingTemplate() {
public KafkaTemplate<Integer, String> partitionZeroReplyTemplate() {
// reply always uses the no-partition, no-key method; subclasses can be used
return new KafkaTemplate<Integer, String>(producerFactory(), true) {

Expand All @@ -1209,6 +1266,19 @@ public ListenableFuture<SendResult<Integer, String>> send(String topic, String d
};
}

@Bean
public KafkaTemplate<Integer, Object> partitionZeroReplyJsonTemplate() {
// reply always uses the no-partition, no-key method; subclasses can be used
return new KafkaTemplate<Integer, Object>(jsonProducerFactory(), true) {

@Override
public ListenableFuture<SendResult<Integer, Object>> send(String topic, Object data) {
return super.send(topic, 0, null, data);
}

};
}

@Bean
public KafkaTemplate<Integer, String> kafkaJsonTemplate() {
KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
Expand Down Expand Up @@ -1727,6 +1797,15 @@ public Collection<String> replyingBatchListenerWithErrorHandler(List<String> in)
throw new RuntimeException("return this");
}

@KafkaListener(id = "replyingBatchListenerCollection", topics = "annotated38",
containerFactory = "batchJsonReplyFactory", splitIterables = false)
@SendTo("annotated38reply")
public Collection<String> replyingBatchListenerCollection(List<String> in) {
return in.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
}

@KafkaListener(id = "batchAckListener", topics = { "annotated26", "annotated27" },
containerFactory = "batchFactory")
public void batchAckListener(List<String> in,
Expand Down
5 changes: 5 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,11 @@ public KafkaListenerErrorHandler voidSendToErrorHandler() {
See <<annotation-error-handling>> for more information.
====

NOTE: If a listener method returns an `Iterable`, by default a record for each element as the value is sent.
Starting with version 2.3.5, set the `splitIterables` property on `@KafkaListener` to `false` and the entire result will be sent as the value of a single `ProducerRecord`.
This requires a suitable serializer in the reply template's producer configuration.
However, if the reply is `Iterable<Message<?>>` the property is ignored and each message is sent separately.

===== Filtering Messages

In certain scenarios, such as rebalancing, a message that has already been processed may be redelivered.
Expand Down
Loading

0 comments on commit aec7807

Please sign in to comment.