Skip to content

Commit

Permalink
GH-3207: RSocket inbound: decode each flux item (#3208)
Browse files Browse the repository at this point in the history
* GH-3207: RSocket inbound: decode each flux item

Fixes #3207

Previously an incoming RSocket Publisher has been decoded as a single unit
leading to extra work on the client side, e.g. a delimiter has to be provided
to treat each payload item as independent

* To have a consistency with Spring Messaging and its `PayloadMethodArgumentResolver`
change an `RSocketInboundGateway` to process inbound payloads as `Flux` and decode
each item independently.
* Change `RSocketDslTests` to remove delimiters and make it consistent with the regular
`RSocketRequester` client

* * Add `decodeFluxAsUnit` option into `RSocketInboundGateway`
* Document the change
  • Loading branch information
artembilan committed Mar 10, 2020
1 parent 1511dd8 commit c45cc66
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,7 +37,6 @@ public class RSocketInboundGatewayParser extends AbstractInboundGatewayParser {

private static final List<String> NON_ELIGIBLE_ATTRIBUTES =
Arrays.asList("path",
"interaction-models",
"rsocket-strategies",
"rsocket-connector",
"request-element-type");
Expand All @@ -61,7 +60,6 @@ protected void doPostProcess(BeanDefinitionBuilder builder, Element element) {
"rSocketStrategies");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "rsocket-connector",
"RSocketConnector");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "interaction-models");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.springframework.integration.rsocket.inbound.RSocketInboundGateway;
import org.springframework.messaging.rsocket.RSocketStrategies;

import reactor.core.publisher.Flux;

/**
* The {@link MessagingGatewaySpec} implementation for the {@link RSocketInboundGateway}.
*
Expand Down Expand Up @@ -82,4 +84,16 @@ public RSocketInboundGatewaySpec requestElementType(ResolvableType requestElemen
return this;
}

/**
* Configure an option to decode an incoming {@link Flux} as a single unit or each its event separately.
* @param decodeFluxAsUnit decode incoming {@link Flux} as a single unit or each event separately.
* @return the spec
* @since 5.3
* @see RSocketInboundGateway#setDecodeFluxAsUnit(boolean)
*/
public RSocketInboundGatewaySpec decodeFluxAsUnit(boolean decodeFluxAsUnit) {
this.target.setDecodeFluxAsUnit(decodeFluxAsUnit);
return this;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,6 +85,8 @@ public class RSocketInboundGateway extends MessagingGatewaySupport implements In
@Nullable
private ResolvableType requestElementType;

private boolean decodeFluxAsUnit;

/**
* Instantiate based on the provided path patterns to map this endpoint for incoming RSocket requests.
* @param pathArg the mapping patterns to use.
Expand Down Expand Up @@ -160,6 +162,20 @@ public void setRequestElementType(ResolvableType requestElementType) {
this.requestElementType = requestElementType;
}

/**
* Configure an option to decode an incoming {@link Flux} as a single unit or each its event separately.
* Defaults to {@code false} for consistency with Spring Messaging {@code @MessageMapping}.
* The target {@link Flux} decoding logic depends on the {@link Decoder} selected.
* For example a {@link org.springframework.core.codec.StringDecoder} requires a new line separator to
* be present in the stream to indicate a byte buffer end.
* @param decodeFluxAsUnit decode incoming {@link Flux} as a single unit or each event separately.
* @since 5.3
* @see Decoder#decode(Publisher, ResolvableType, MimeType, java.util.Map)
*/
public void setDecodeFluxAsUnit(boolean decodeFluxAsUnit) {
this.decodeFluxAsUnit = decodeFluxAsUnit;
}

@Override
protected void onInit() {
super.onInit();
Expand Down Expand Up @@ -219,14 +235,17 @@ private Mono<Message<?>> decodeRequestMessage(Message<?> requestMessage) {
@SuppressWarnings("unchecked")
@Nullable
private Object decodePayload(Message<?> requestMessage) {
ResolvableType elementType = this.requestElementType;
ResolvableType elementType;
MimeType mimeType = requestMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class);
if (elementType == null) {
if (this.requestElementType == null) {
elementType =
mimeType != null && "text".equals(mimeType.getType())
? ResolvableType.forClass(String.class)
: ResolvableType.forClass(byte[].class);
}
else {
elementType = this.requestElementType;
}

Object payload = requestMessage.getPayload();

Expand All @@ -235,9 +254,18 @@ private Object decodePayload(Message<?> requestMessage) {
if (payload instanceof DataBuffer) {
return decoder.decode((DataBuffer) payload, elementType, mimeType, null);
}
else {
else if (this.decodeFluxAsUnit) {
return decoder.decode((Publisher<DataBuffer>) payload, elementType, mimeType, null);
}
else {
return Flux.from((Publisher<DataBuffer>) payload)
.handle((buffer, synchronousSink) -> {
Object value = decoder.decode(buffer, elementType, mimeType, null);
if (value != null) {
synchronousSink.next(value);
}
});
}
}

private Flux<DataBuffer> createReply(Object reply, Message<?> requestMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="decode-flux-as-unit" default="false">
<xsd:annotation>
<xsd:documentation>
Decode incoming Flux as a single unit or each event separately.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:union memberTypes="xsd:boolean xsd:string"/>
</xsd:simpleType>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
auto-startup="false"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
request-element-type="byte[]"
decode-flux-as-unit="true"/>

</beans>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,7 +44,7 @@ class RSocketInboundGatewayParserTests {
private RSocketInboundGateway inboundGateway;

@Test
void testOutboundGatewayParser() {
void testInboundGatewayParser() {
assertThat(TestUtils.getPropertyValue(this.inboundGateway, "rsocketConnector"))
.isSameAs(this.clientRSocketConnector);
assertThat(TestUtils.getPropertyValue(this.inboundGateway, "rsocketStrategies"))
Expand All @@ -54,6 +54,7 @@ void testOutboundGatewayParser() {
.isEqualTo(byte[].class);
assertThat(this.inboundGateway.getInteractionModels())
.containsExactly(RSocketInteractionModel.fireAndForget, RSocketInteractionModel.requestChannel);
assertThat(TestUtils.getPropertyValue(this.inboundGateway, "decodeFluxAsUnit", Boolean.class)).isTrue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.springframework.integration.rsocket.ClientRSocketConnector;
import org.springframework.integration.rsocket.RSocketInteractionModel;
import org.springframework.integration.rsocket.ServerRSocketConnector;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

Expand All @@ -48,17 +50,30 @@ public class RSocketDslTests {

@Autowired
@Qualifier("rsocketUpperCaseRequestFlow.gateway")
private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;
private Function<Object, Flux<String>> rsocketUpperCaseFlowFunction;

@Test
void testRsocketUpperCaseFlows() {
Flux<String> result = this.rsocketUpperCaseFlowFunction.apply(Flux.just("a\n", "b\n", "c\n"));
Flux<String> result = this.rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c"));

StepVerifier.create(result)
.expectNext("A", "B", "C")
.verifyComplete();
}

@Test
void testRsocketUpperCaseWholeFlows() {
Message<Flux<String>> testMessage =
MessageBuilder.withPayload(Flux.just("a", "b", "c", "\n"))
.setHeader("route", "/uppercaseWhole")
.build();
Flux<String> result = this.rsocketUpperCaseFlowFunction.apply(testMessage);

StepVerifier.create(result)
.expectNext("ABC")
.verifyComplete();
}

@Configuration
@EnableIntegration
public static class TestConfiguration {
Expand All @@ -80,7 +95,8 @@ public ClientRSocketConnector clientRSocketConnector(ServerRSocketConnector serv
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.handle(RSockets.outboundGateway(message ->
message.getHeaders().getOrDefault("route", "/uppercase"))
.interactionModel((message) -> RSocketInteractionModel.requestChannel)
.expectedResponseType("T(java.lang.String)")
.clientRSocketConnector(clientRSocketConnector),
Expand All @@ -100,6 +116,16 @@ public IntegrationFlow rsocketUpperCaseFlow() {
.get();
}

@Bean
public IntegrationFlow rsocketUpperCaseWholeFlow() {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercaseWhole")
.interactionModels(RSocketInteractionModel.requestChannel)
.decodeFluxAsUnit(true))
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}

}

}
6 changes: 6 additions & 0 deletions src/reference/asciidoc/rsocket.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ The `payload` of the message to send downstream is always a `Flux` according to
When in a `fireAndForget` RSocket interaction model, the message has a plain converted `payload`.
The reply `payload` could be a plain object or a `Publisher` - the `RSocketInboundGateway` converts both of them properly into an RSocket response according to the encoders provided in the `RSocketStrategies`.

Starting with version 5.3, a `decodeFluxAsUnit` option (default `false`) is added to the `RSocketInboundGateway`.
By default incoming `Flux` is transformed the way that each its event is decoded separately.
This is an exact behavior present currently with `@MessageMapping` semantics.
To restore a previous behavior or decode the whole `Flux` as single unit according application requirements, the `decodeFluxAsUnit` has to be set to `true`.
However the target decoding logic depends on the `Decoder` selected, e.g. a `StringDecoder` requires a new line separator (by default) to be present in the stream to indicate a byte buffer end.

See <<rsocket-java-config>> for samples how to configure an `RSocketInboundGateway` endpoint and deal with payloads downstream.

[[rsocket-outbound]]
Expand Down
6 changes: 6 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,9 @@ See <<./ws.adoc#ws,Web Services Support>> for more information.

The `FailoverClientConnectionFactory` no longer fails back, by default, until the current connection fails.
See <<./ip.adoc#failover-cf,TCP Failover Client Connection Factory>> for more information.

[[x5.3-rsocket]]
=== RSocket Changes

A `decodeFluxAsUnit` option has been added to the `RSocketInboundGateway` with the meaning to decode incoming `Flux` as a single unit or apply decoding for each event in it.
See <<./rsocket.adoc#rsocket-inbound,RSocket Inbound Gateway>> for more information.

0 comments on commit c45cc66

Please sign in to comment.