Skip to content

Commit

Permalink
INT-4538: Fix Splitter for ArrayNode.size()
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-4538

* Add support for Jackson `TreeNode.size()` in the `AbstractMessageSplitter`
* Add overloaded `Transformers.toJson(ObjectToJsonTransformer.ResultType)`
* Modify test to verify Jackson `ArrayNode` use-case with the splitter
and aggregator

**Cherry-pick to 5.0.x**
  • Loading branch information
artembilan authored and garyrussell committed Oct 8, 2018
1 parent c4d6cf2 commit 716494c
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ public static ObjectToJsonTransformer toJson(JsonObjectMapper<?, ?> jsonObjectMa
return toJson(jsonObjectMapper, null, contentType);
}

/**
* Factory for the {@link ObjectToJsonTransformer} based on the provided {@link ObjectToJsonTransformer.ResultType}.
* @param resultType the {@link ObjectToJsonTransformer.ResultType} to use.
* Defaults to {@link ObjectToJsonTransformer.ResultType#STRING}.
* @return the ObjectToJsonTransformer
* @since 5.0.9
*/
public static ObjectToJsonTransformer toJson(ObjectToJsonTransformer.ResultType resultType) {
return toJson(null, resultType, null);
}

public static ObjectToJsonTransformer toJson(ObjectToJsonTransformer.ResultType resultType, String contentType) {
return toJson(null, resultType, contentType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.json.JacksonPresent;
import org.springframework.integration.util.FunctionIterator;
import org.springframework.messaging.Message;

import com.fasterxml.jackson.core.TreeNode;
import reactor.core.publisher.Flux;

/**
Expand Down Expand Up @@ -163,12 +165,21 @@ else if (result instanceof Publisher<?>) {
/**
* Obtain a size of the provided {@link Iterable}. Default implementation returns
* {@link Collection#size()} if the iterable is a collection, or {@code 0} otherwise.
* If iterable is a Jackson {@code TreeNode}, then its size is used.
* @param iterable the {@link Iterable} to obtain the size
* @return the size of the {@link Iterable}
* @since 5.0
*/
protected int obtainSizeIfPossible(Iterable<?> iterable) {
return iterable instanceof Collection ? ((Collection<?>) iterable).size() : 0;
if (iterable instanceof Collection) {
return ((Collection<?>) iterable).size();
}
else if (JacksonPresent.isJackson2Present() && JacksonNodeHelper.isNode(iterable)) {
return JacksonNodeHelper.nodeSize(iterable);
}
else {
return 0;
}
}

/**
Expand Down Expand Up @@ -266,4 +277,18 @@ public String getComponentType() {
*/
protected abstract Object splitMessage(Message<?> message);


private static class JacksonNodeHelper {

private static boolean isNode(Object object) {
return object instanceof TreeNode;
}

@SuppressWarnings("unchecked")
private static int nodeSize(Object node) {
return ((TreeNode) node).size();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy;
import org.springframework.integration.channel.QueueChannel;
Expand All @@ -45,7 +44,9 @@
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannelSpec;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.handler.MessageTriggerAction;
import org.springframework.integration.json.ObjectToJsonTransformer;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
Expand All @@ -55,6 +56,8 @@
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

import com.fasterxml.jackson.databind.node.TextNode;

/**
* @author Artem Bilan
* @author Gary Russell
Expand All @@ -73,7 +76,6 @@ public class CorrelationHandlerTests {


@Autowired
@Qualifier("splitAggregateInput")
private MessageChannel splitAggregateInput;

@Autowired
Expand Down Expand Up @@ -114,7 +116,7 @@ public void testSplitterResequencer() {

@Test
public void testSplitterAggregator() {
List<Character> payload = Arrays.asList('a', 'b', 'c', 'd', 'e');
List<String> payload = Arrays.asList("a", "b", "c", "d", "e");

QueueChannel replyChannel = new QueueChannel();
this.splitAggregateInput.send(MessageBuilder.withPayload(payload)
Expand All @@ -127,7 +129,8 @@ public void testSplitterAggregator() {
@SuppressWarnings("unchecked")
List<Object> result = (List<Object>) receive.getPayload();
for (int i = 0; i < payload.size(); i++) {
assertEquals(payload.get(i), result.get(i));
assertThat(result.get(i), instanceOf(TextNode.class));
assertEquals(TextNode.valueOf(payload.get(i)), result.get(i));
}
}

Expand Down Expand Up @@ -204,6 +207,7 @@ public IntegrationFlow splitResequenceFlow(MessageChannel executorChannel) {
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlows.from("splitAggregateInput", true)
.transform(Transformers.toJson(ObjectToJsonTransformer.ResultType.NODE))
.split()
.channel(MessageChannels.executor(taskExecutor()))
.resequence()
Expand Down Expand Up @@ -260,7 +264,6 @@ public IntegrationFlow barrierFlow() {
}

@Bean
@DependsOn("barrierFlow")
public IntegrationFlow releaseBarrierFlow(MessageTriggerAction barrierTriggerAction) {
return IntegrationFlows.from(releaseChannel())
.trigger(barrierTriggerAction,
Expand Down
8 changes: 4 additions & 4 deletions src/reference/asciidoc/splitter.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@ We recommend this approach, because it decouples the code from the Spring Integr

===== Iterators

Starting with _version 4.1_, the `AbstractMessageSplitter` supports the `Iterator` type for the `value` to split.
Starting with version 4.1, the `AbstractMessageSplitter` supports the `Iterator` type for the `value` to split.
Note, in the case of an `Iterator` (or `Iterable`), we don't have access to the number of underlying items and the `SEQUENCE_SIZE` header is set to `0`.
This means that the default `SequenceSizeReleaseStrategy` of an `<aggregator>` won't work and the group for the `CORRELATION_ID` from the `splitter` won't be released; it will remain as `incomplete`.
In this case you should use an appropriate custom `ReleaseStrategy` or rely on `send-partial-result-on-expiry` together with `group-timeout` or a `MessageGroupStoreReaper`.

Starting with _version 5.0_, the `AbstractMessageSplitter` provides `protected obtainSizeIfPossible()` methods to allow the determination of the size of the `Iterable` and `Iterator` objects if that is possible.
Starting with version 5.0, the `AbstractMessageSplitter` provides `protected obtainSizeIfPossible()` methods to allow the determination of the size of the `Iterable` and `Iterator` objects if that is possible.
For example `XPathMessageSplitter` can determine the size of the underlying `NodeList` object.
And starting with version 5.0.9, this method also properly returns a size of the `com.fasterxml.jackson.core.TreeNode`.

An `Iterator` object is useful to avoid the need for building an entire collection in the memory before splitting.
For example, when underlying items are populated from some external system (e.g.
DataBase or FTP `MGET`) using iterations or streams.
For example, when underlying items are populated from some external system (e.g. DataBase or FTP `MGET`) using iterations or streams.

===== Stream and Flux

Expand Down

0 comments on commit 716494c

Please sign in to comment.