Skip to content

Fix RSocket Fire and forget handling with Kotlin #23866

@denyshorman

Description

@denyshorman

The following code with suspend function throws Missing 'rsocketResponse' exception.
println prints data to the console as expected. It works only when the method is rewritten with Mono: fun enter(data: Any): Mono<Void>.
The same issue is reproduced for methods annotated with @ConnectMapping.

Reproduced with Spring Boot 2.2.0.RELEASE and 2.2.1.SNAPSHOT.

@Controller
class Api {
    @MessageMapping("test")
    suspend fun enter(data: Any) {
        println(data)
    }
}

Stacktrace:

java.lang.IllegalArgumentException: Missing 'rsocketResponse'
	at org.springframework.util.Assert.notNull(Assert.java:198)
	at org.springframework.messaging.rsocket.annotation.support.RSocketPayloadReturnValueHandler.handleEncodedContent(RSocketPayloadReturnValueHandler.java:65)
	at org.springframework.messaging.handler.invocation.reactive.AbstractEncoderMethodReturnValueHandler.lambda$handleReturnValue$0(AbstractEncoderMethodReturnValueHandler.java:124)
	at org.springframework.messaging.handler.invocation.reactive.ChannelSendOperator$WriteBarrier.onComplete(ChannelSendOperator.java:251)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1820)
	at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:129)
	at kotlinx.coroutines.reactor.MonoCoroutine.onCompleted(Mono.kt:71)
	at kotlinx.coroutines.AbstractCoroutine.onCompletionInternal(AbstractCoroutine.kt:102)
	at kotlinx.coroutines.JobSupport.tryFinalizeSimpleState(JobSupport.kt:275)
	at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:807)
	at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:787)
	at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:111)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
	at kotlinx.coroutines.DispatchedKt.resumeCancellable(Dispatched.kt:457)
	at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:26)
	at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:109)
	at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:154)
	at kotlinx.coroutines.reactor.MonoKt$monoInternal$1.accept(Mono.kt:60)
	at kotlinx.coroutines.reactor.MonoKt$monoInternal$1.accept(Mono.kt)
	at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:53)
	at org.springframework.messaging.handler.invocation.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:85)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592)
	at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247)
	at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2148)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:132)
	at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:318)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:145)
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4087)
	at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
	at io.rsocket.RSocketResponder.handleFireAndForget(RSocketResponder.java:366)
	at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:296)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592)
	at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
	at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
	at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
	at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
	at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:206)
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:329)
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
	at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:457)
	at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:153)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:830)

Metadata

Metadata

Assignees

Labels

in: messagingIssues in messaging modules (jms, messaging)type: bugA general bug

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions