-
Notifications
You must be signed in to change notification settings - Fork 38.7k
Description
Spring Framework Version:5.3.2
Problem background:
The application needs to send a result of a db query over RSocket channel. In some cases there are no elements in the result set and empty Flux is sent. This is normal from the business logic point of view.
I had a discussion about this problem with @OlegDokuka and he suggested to file the issue here (https://gitter.im/rsocket/rsocket-java?at=5fe36654ac9d8e7463c97042)
Application (server) entrypoint:
@MessageMapping("objectChannel.{test}")
Flux<Instant> rsocketObjectChannel(@DestinationVariable String test, Flux<Instant> flux) {
return flux
.map(i -> i.plus(1, DAYS))
.doOnNext(s -> System.out.println("rsocketObjectChannel.next: " + s))
.doOnComplete(() -> System.out.println("rsocketObjectChannel complete"));
}
Test (client) side:
@Test
void testEmptyObjectChannel() {
Flux<Instant> data = Flux.empty();
Flux<Instant> result = requester.route("objectChannel.{test}", "route-var1")
.data(data)
.retrieveFlux(Instant.class)
.doOnComplete(() -> System.out.println("testObjectChannel complete"));
StepVerifier
.create(result)
.verifyComplete();
}
Result:
org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 1 in reactor.core.publisher.Flux<java.time.Instant> com.gt.example.reactivedemo.RSocketServerHandler.rsocketObjectChannel(java.lang.String,reactor.core.publisher.Flux<java.time.Instant>): Failed to read HTTP message; nested exception is org.springframework.core.codec.DecodingException: JSON decoding error: No content to map due to end-of-input; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
at [Source: (io.netty.buffer.ByteBufInputStream); line: -1, column: 0]
at org.springframework.messaging.handler.annotation.reactive.PayloadMethodArgumentResolver.handleReadError(PayloadMethodArgumentResolver.java:266) ~[spring-messaging-5.3.2.jar:5.3.2]
at org.springframework.messaging.handler.annotation.reactive.PayloadMethodArgumentResolver.lambda$decodeContent$2(PayloadMethodArgumentResolver.java:236) ~[spring-messaging-5.3.2.jar:5.3.2]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:112) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxSwitchOnFirst$SwitchOnFirstMain.tryOnNext(FluxSwitchOnFirst.java:363) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.drain(FluxSwitchOnFirst.java:297) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.request(FluxSwitchOnFirst.java:259) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:162) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:137) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:162) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:162) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2154) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:170) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxSwitchOnFirst$SwitchOnFirstMain.subscribe(FluxSwitchOnFirst.java:352) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.1.jar:3.4.1]
at org.springframework.messaging.handler.invocation.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:85) ~[spring-messaging-5.3.2.jar:5.3.2]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:325) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:208) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.Flux.subscribe(Flux.java:8147) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.onNext(FluxSwitchOnFirst.java:157) ~[reactor-core-3.4.1.jar:3.4.1]
at io.rsocket.core.RequestChannelResponderSubscriber.request(RequestChannelResponderSubscriber.java:241) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.onSubscribe(FluxSwitchOnFirst.java:127) ~[reactor-core-3.4.1.jar:3.4.1]
at io.rsocket.core.RequestChannelResponderSubscriber.subscribe(RequestChannelResponderSubscriber.java:164) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.1.jar:3.4.1]
at io.rsocket.core.RSocketResponder.handleChannel(RSocketResponder.java:410) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:208) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.1.jar:3.4.1]
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.1.jar:3.4.1]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:265) ~[reactor-netty-core-1.0.2.jar:1.0.2]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:371) ~[reactor-netty-core-1.0.2.jar:1.0.2]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381) ~[reactor-netty-core-1.0.2.jar:1.0.2]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:544) ~[reactor-netty-http-1.0.2.jar:1.0.2]
at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:161) ~[reactor-netty-http-1.0.2.jar:1.0.2]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.2.jar:1.0.2]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.55.Final.jar:4.1.55.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311) ~[netty-codec-4.1.55.Final.jar:4.1.55.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432) ~[netty-codec-4.1.55.Final.jar:4.1.55.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[netty-codec-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.55.Final.jar:4.1.55.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.55.Final.jar:4.1.55.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.55.Final.jar:4.1.55.Final]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.springframework.core.codec.DecodingException: JSON decoding error: No content to map due to end-of-input; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
at [Source: (io.netty.buffer.ByteBufInputStream); line: -1, column: 0]
at org.springframework.http.codec.json.AbstractJackson2Decoder.processException(AbstractJackson2Decoder.java:228) ~[spring-web-5.3.2.jar:5.3.2]
at org.springframework.http.codec.json.AbstractJackson2Decoder.decode(AbstractJackson2Decoder.java:186) ~[spring-web-5.3.2.jar:5.3.2]
at org.springframework.messaging.handler.annotation.reactive.PayloadMethodArgumentResolver.lambda$decodeContent$1(PayloadMethodArgumentResolver.java:235) ~[spring-messaging-5.3.2.jar:5.3.2]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106) ~[reactor-core-3.4.1.jar:3.4.1]
... 83 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
at [Source: (io.netty.buffer.ByteBufInputStream); line: -1, column: 0]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[jackson-databind-2.11.3.jar:2.11.3]
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1468) ~[jackson-databind-2.11.3.jar:2.11.3]
at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:360) ~[jackson-databind-2.11.3.jar:2.11.3]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2064) ~[jackson-databind-2.11.3.jar:2.11.3]
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1453) ~[jackson-databind-2.11.3.jar:2.11.3]
at org.springframework.http.codec.json.AbstractJackson2Decoder.decode(AbstractJackson2Decoder.java:181) ~[spring-web-5.3.2.jar:5.3.2]
... 85 common frames omitted
Expected behavior:
Reactive pipeline on the server side does not fail with the exception above
For details, see https://github.com/MastaP/reactive-demo
RSocket handler: https://github.com/MastaP/reactive-demo/blob/master/src/main/java/com/gt/example/reactivedemo/RSocketServerHandler.java
Tests: https://github.com/MastaP/reactive-demo/blob/master/src/test/java/com/gt/example/reactivedemo/BaseDemoTests.java