Skip to content

Inbound stream cancellation does not propagate in Spring Boot RSocket Reactive #26520

@dvankley

Description

@dvankley

Affects: \5.3.3
(Spring Boot 2.4.2)


Expected

RSocket channel endpoint in my Spring Boot application receives cancellation signal from the inbound, client-driven stream to allow server side cleanup, etc.

Actual

No cancellation, completion, or error signal received by my reactive stream, either Flux or Flow.

Setup

Relevant dependencies:

  • Spring Boot 2.4.2
  • Kotlin 1.4.21
  • Kotlinx Coroutines 1.4.2
  • RSocket Core 1.1.0 (also tried 1.1.1-SNAPSHOT, which suppresses "Operator called default onErrorDropped" but still doesn't propagate the cancellation)

I have tried to achieve my goal with both Kotlin coroutine Flows and Reactor Flux(en?). Both client/server pairs below should do the same thing: establish an RSocket channel, send 2 "ping" payloads from the client, the server responds to each with a "pong" payload, and the client closes the connection.

Flow server side:

    @MessageMapping("testFlow")
    fun testPingFlow(input: Flow<String>): Flow<String> {
        val cs = CoroutineScope(EmptyCoroutineContext)
        val output = MutableSharedFlow<String>(10)

        cs.launch {
            try {
                input
                    .catch { e ->
                        logger.error("Rsocket server input error", e)
                    }
                    .onCompletion { exception ->
                        logger.debug("Rsocket server input completed")
                        if (exception != null) {
                            logger.error("Exception received while processing Rsocket server input flow", exception)
                        }
                    }
                    // Normal .collect complains about being internal-only
                    .collectIndexed { _, message ->
                        logger.debug("Rsocket server input received $message")
                        output.emit("pong ${System.currentTimeMillis()}")
                    }
            } catch (e: Throwable) {
                logger.error("Rsocket server input connection exception caught", e)
            }
        }
        return output
    }

Flow client side test:

    @Test
    fun testPingFlow() {
        val outToServer = MutableSharedFlow<String>(10)

        runBlocking {
            val socketFlow = rSocketRequester
                .route("testFlow")
                .data(outToServer.asFlux())
                .retrieveFlow<String>()
                .take(2)

            outToServer.emit("Ping ${System.currentTimeMillis()}")
            outToServer.emit("Ping ${System.currentTimeMillis()}")

            socketFlow
                .onCompletion { exception ->
                    logger.debug("Rsocket client output completed")
                    if (exception != null) {
                        logger.error("Exception received while processing Rsocket client output flow", exception)
                    }
                }
                .collect { message ->
                    logger.debug("Received pong from server $message")
                }
        }
    }

Flux server side:

    @MessageMapping("testFlux")
    fun testPingFlux(input: Flux<String>): Flux<String> {
        val output = Sinks.many().unicast().onBackpressureBuffer<String>()
        try {
            input
                .doOnNext { message ->
                    logger.debug("Rsocket server input message received $message")
                }
                .doOnError { e ->
                    logger.error("Rsocket server input connection error", e)
                }
                .doOnCancel {
                    logger.debug("Rsocket server input cancelled")
                }
                .doOnComplete {
                    logger.debug("Rsocket server input completed")
                }
                .subscribe { message ->
                    output.tryEmitNext("pong ${System.currentTimeMillis()}")
                }
        } catch (e: Throwable) {
            logger.error("Rsocket server input connection exception caught", e)
        }
        return output.asFlux()
    }

Flux client side test:

    @Test
    fun testPingFlux() {
        val outToServer = Sinks.many().unicast().onBackpressureBuffer<String>()

        rSocketRequester
            .route("testFlux")
            .data(outToServer.asFlux())
            .retrieveFlux<String>()
            .doOnCancel {
                logger.debug("Rsocket client output connection completed")
            }
            .doOnError { e ->
                logger.error("Exception received while processing Rsocket client output flow", e)
            }
            .take(2)
            .subscribe { message ->
                logger.debug("Received pong from server $message")
            }

        outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
        outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
    }

The Problem

Both client/server snippets above do in fact send ping/pong payloads back and forth, but in each case I get no handling on the server side of the client cancelling the connection. I get my own log line of Rsocket client output completed from the client side, then Operator called default onErrorDropped from Reactor and the following stack trace from RSocket:

java.util.concurrent.CancellationException: Inbound has been canceled
	at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:357) ~[rsocket-core-1.1.0.jar:na]
	at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
	at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
	at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
	at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
	at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
	at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
	at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:267) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:377) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
	at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

This is a problem as (beyond this toy example) my application needs to do server-side cleanup on connection close.

Things I Have Tried

  • All the various methods to catch exceptions, cancellation, or completion on Flows or Fluxen, many of which are illustrated in the example above.
  • try/catch blocks in the subscribe/collect lambdas.
  • Coupling the server response Flux/Flow directly to the input Flux/Flow via a map operator rather than creating a separate output Flux/Flow.
  • Stepping through framework code in the debugger, which I am not ashamed to say lost me pretty quickly. My best theory from this adventure is that the Flux/Flow that receives the cancellation signal is somehow decoupled from the input Flux/Flow that my server method receives, but there are too many layers of abstraction for me to trace it.
  • I asked about this on Stackoverflow and was told this is 'a bug in the reactor switchOnFirst operator which does not propagate an inbound error if outbound has been canceled' and was directed to report a bug here, so here we are. I kind of wonder if this is a Project Reactor bug because switchOnFirst is part of that project, but I was directed here by Oleh Dokuka, who wrote that class, so I assume he knows what he's talking about.

Metadata

Metadata

Assignees

No one assigned

    Labels

    for: external-projectNeeds a fix in external projectin: messagingIssues in messaging modules (jms, messaging)

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions