Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

random cancel() in reactor netty channelOperation terminate() #2936

Closed
jtorkkel opened this issue Oct 17, 2023 · 3 comments
Closed

random cancel() in reactor netty channelOperation terminate() #2936

jtorkkel opened this issue Oct 17, 2023 · 3 comments
Assignees
Labels
status/invalid We don't feel this issue is valid

Comments

@jtorkkel
Copy link

Mono invokes cancel() in addition of onComplete() or cancel() instead of onComplete() resulting incorrect observations metrics

Current situation:

We are using SpringBoot 3.1 in preproduction and we have noticed that new longTaskTimer metrics "spring_security_filterchains_active_seconds_active_count{spring_security_reached_filter_name="none",spring_security_reached_filter_section="after"}" keep increasing constantly. We have also noted that http_server_request metrics reports inaccurate request counts and status="UNKNOWN", outcome="UNKNOWN".

In 24h tests where we made 50 Million requests to service using ~20 parallel threads using connection pool, "spring_security_filterchains_active_seconds_active_count increases to value 8000, after 10d the value was still 8000 (server was running, no more requests).
At the same time http_server_requests count was 20000 larger than netty_server metrics, hinting that some requests calculated twice.

Same issue happen when test client like Jmeter is forced to "stop" requests before getting response. When repeated > 10000 times the spring_security_filterchains_active_seconds_active_count increase to over 10000 indicating no max limit in single longTaskTimermetrics. Each longTaskTimeris stored to micrometer registry, so this is considered as memory leak and eventually service will OOM or slow down.

The cancel() also happen duding few second tests where connection pool is used and no socket close during the tests.

Our analysis indicates that

  • When rest controller use Mono, normally stream ends to onComplete(), but randomly there is onComplete()+onCancel() or just cancel(). Interesting that cancel() based on logs timestamp happen more than 5ms later than onComplete.
  • When rest controller use Flux, stream end always to onComplete(), there are no extra internal cancel()
  • when external client cancel the requests (user switch apps in their mobile the) or requests timeouts and the client forced stop result external cancel() as expected, also for Flux

In all these cases the cancel() cause the following issues

  1. http_server_requests metrics reports metrics twice when onComplete+OnCancel is detected. One metrics reported as status="200, outcome="SUCCESS", another as status="UNKNOWN", outcome="UNKNOWN"
  2. http_server_requests metrics reports status="UNKNOWN", outcome="UNKNOWN" when there is only cancel()
  3. spring_security_filterchains_active_seconds_active_count "after" filter is always started, even when cancel() happen before filter started, and never stopped when there is cancel()
  4. http_server_requests filter is always stopped, also with external and internal cancel()

Issue status:

  • Random cancel(), root cause unknown, this ISSUE.
  • spring_security_filterchains_active_seconds_active_count "after" is always started, also after external cancel(), but onCancel does not stop it, including external triggered cancel(), 14031
  • I reported 1 and 2 in #31365, fixed #31417 (OnComplete()+Cancel()), and related status="UNKNOWN fixed in #31388

For example after 5000 request, 10 threads, 500 requests the metrics might show 4836.0 status="200", 190.0 status="UNKNOWNN totalling 5026 requests, when netty shown 5000 was handled.
Similar way 135 spring_security_filterchain "after" longTaskTimer were created but never stopped resulting leak.

spring_security_filterchains_active_seconds_active_count{spring_security_filterchain_position="0",spring_security_filterchain_size="0",spring_security_reached_filter_name="none",spring_security_reached_filter_section="after",} 135.0
spring_security_filterchains_seconds_count{error="none",spring_security_filterchain_position="13",spring_security_filterchain_size="13",spring_security_reached_filter_name="ServerWebExchangeReactorContextWebFilter",spring_security_reached_filter_section="after",} 4853.0

custom_channel_invocations_total{source="None",uri="/monoCancelNoLog",} 5000.0
reactor_netty_http_server_data_sent_bytes_count{uri="/",} 5000.0
reactor_netty_http_server_data_received_time_seconds_count{method="GET",uri="/",} 5000.0

http_server_requests_seconds_count{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/monoCancelNoLog",} 4836.0
http_server_requests_seconds_count{error="none",exception="none",method="GET",outcome="UNKNOWN",status="UNKNOWN",uri="/monoCancelNoLog",} 190.0

Desired situation:

  • there should not be random internal cancel()
  • spring_security_filterchains_active_seconds_active_count should handle the cancel and probably report status success/cancel in spring_security_filterchains_seconds_count metrics, 14031

cancel() is not impacting to business logic and actual data streams, in testing no errors seen in HTTP responses. It seems only to impact "observations".

Environment:

  • SpringBoot 3.1.4 (also happen with SpringBoot 3.0, and cancel() also seen similar way in 2.5, 2.7.14 where no observations --> no longRunningTimer
  • Java 17
  • Linux, reproduced in Windows. In windows cancel() rate might be close to % whereas in Linux might be ~0.1%. So VM specific. "Not happening with single thread"

Repro steps:

We initilly thought that this only happen with JWT but cancel() and spring_security_filterchains_active_seconds_active_count reproduce repro also with BASIC auth and also without authentication.
Similar way it repro with external calls like WebClient, calls but it also reproduce with restController simple Mono handler.

Attaced example application included which can be used to repro the cancel() and spring_security_filterchains_active_seconds_active_count increase

example.zip

How to repro random cancel():
Make few thousands calls to endpoint "GET localhost:8081/monoCancel" with minimum 10 threads. If no cancel() use 50 threads. cancel() and spring_security_filterchains_active_seconds_active_count increase is not reproducing if less threads are used.
Also seems that in Windows cancel() is much more frequent than in linux, in Windows even 5-10% of the calls might result cancel().
In Windows I can repro always with 10 threads, and 100 request on each thread, i I use 10 threads, and 10 requests each I have to try few times.

If too many console entries without cancel(), use "GET localhost:8081/monoCancelNoLog", then you see only stackTrace when cancel() happen

/monoCancel is the simplest possible restController

    // prints whole reactive stream, onSubscribe, request, onNext, onComplete and onCancel and stackTrace 
    @GetMapping("monoCancel")
    public Mono<String> monoCancel(){
        return Mono.just("monoCancel").log().doOnCancel(() -> new Exception().printStackTrace());
    }

    // print only stackTrace when cancel
    @GetMapping("monoCancelNoLog")
    public Mono<String> monoCancelnoLog(){
        return Mono.just("monoCancelNoLog").doOnCancel(() -> new Exception().printStackTrace());
    }

Expected results with onComplete() and no cancel() when two call made using single thread to "GET localhost:8081/monoCancel"


//GET /status
2023-10-17T10:37:43.668+03:00 DEBUG 2820 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [bf0a4c4b, L:/127.0.0.1:8081 - R:/127.0.0.1:33681] New http connection, requesting read
2023-10-17T10:37:43.685+03:00 DEBUG 2820 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [bf0a4c4b, L:/127.0.0.1:8081 - R:/127.0.0.1:33681] Increasing pending responses, now 1
2023-10-17T10:37:43.717+03:00 ERROR 2820 --- [ctor-http-nio-3] e.e.CustomDefaultMeterObservationHandler : STARTname=http.server.requests uri=keyValue(uri=UNKNOWN)
2023-10-17T10:37:43.784+03:00  INFO 2820 --- [     parallel-1] reactor.Mono.Just.1                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2023-10-17T10:37:43.784+03:00  INFO 2820 --- [     parallel-1] reactor.Mono.Just.1                      : | request(unbounded)
2023-10-17T10:37:43.785+03:00  INFO 2820 --- [     parallel-1] reactor.Mono.Just.1                      : | onNext(monoCancel)
2023-10-17T10:37:43.788+03:00  INFO 2820 --- [     parallel-1] reactor.Mono.Just.1                      : | onComplete()
2023-10-17T10:37:43.797+03:00 DEBUG 2820 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [bf0a4c4b-1, L:/127.0.0.1:8081 - R:/127.0.0.1:33681] Decreasing pending responses, now 0
2023-10-17T10:37:43.798+03:00 ERROR 2820 --- [ctor-http-nio-3] e.e.CustomDefaultMeterObservationHandler : STARTname= spring.security.filterchains, spring.security.reached.filter.section=after
2023-10-17T10:37:43.798+03:00 ERROR 2820 --- [ctor-http-nio-3] e.e.CustomDefaultMeterObservationHandler : STOPname= spring.security.filterchains, spring.security.reached.filter.section=after
2023-10-17T10:37:43.798+03:00 ERROR 2820 --- [ctor-http-nio-3] e.e.CustomDefaultMeterObservationHandler : STOPname=http.server.requests uri=keyValue(uri=/monoCancel)
2023-10-17T10:37:43.802+03:00 DEBUG 2820 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [bf0a4c4b-1, L:/127.0.0.1:8081 - R:/127.0.0.1:33681] Last HTTP response frame
2023-10-17T10:37:43.804+03:00 DEBUG 2820 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [bf0a4c4b-1, L:/127.0.0.1:8081 - R:/127.0.0.1:33681] Last HTTP packet was sent, terminating the channel

2023-10-17T10:37:43.805+03:00 DEBUG 2820 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [bf0a4c4b, L:/127.0.0.1:8081 - R:/127.0.0.1:33681] Increasing pending responses, now 1
2023-10-17T10:37:43.805+03:00 ERROR 2820 --- [ctor-http-nio-3] e.e.CustomDefaultMeterObservationHandler : STARTname=http.server.requests uri=keyValue(uri=UNKNOWN)
2023-10-17T10:37:43.808+03:00  INFO 2820 --- [     parallel-2] reactor.Mono.Just.2                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2023-10-17T10:37:43.808+03:00  INFO 2820 --- [     parallel-2] reactor.Mono.Just.2                      : | request(unbounded)
2023-10-17T10:37:43.808+03:00  INFO 2820 --- [     parallel-2] reactor.Mono.Just.2                      : | onNext(monoCancel)
2023-10-17T10:37:43.809+03:00  INFO 2820 --- [     parallel-2] reactor.Mono.Just.2                      : | onComplete()
2023-10-17T10:37:43.810+03:00 DEBUG 2820 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [bf0a4c4b-2, L:/127.0.0.1:8081 - R:/127.0.0.1:33681] Decreasing pending responses, now 0
2023-10-17T10:37:43.811+03:00 ERROR 2820 --- [ctor-http-nio-3] e.e.CustomDefaultMeterObservationHandler : STARTname= spring.security.filterchains, spring.security.reached.filter.section=after
2023-10-17T10:37:43.811+03:00 ERROR 2820 --- [ctor-http-nio-3] e.e.CustomDefaultMeterObservationHandler : STOPname= spring.security.filterchains, spring.security.reached.filter.section=after
2023-10-17T10:37:43.811+03:00 ERROR 2820 --- [ctor-http-nio-3] e.e.CustomDefaultMeterObservationHandler : STOPname=http.server.requests uri=keyValue(uri=/monoCancel)
2023-10-17T10:37:43.812+03:00 DEBUG 2820 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [bf0a4c4b-2, L:/127.0.0.1:8081 - R:/127.0.0.1:33681] Last HTTP response frame
2023-10-17T10:37:43.812+03:00 DEBUG 2820 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [bf0a4c4b-2, L:/127.0.0.1:8081 - R:/127.0.0.1:33681] Last HTTP packet was sent, terminating the channel

In above logs threre are reactor netty HttpServerOperations logs enabled to show that client pooling works, socket reused.
CustomDefaultMeterObservationHandler extra logging is enabled which logs http_server_requests and spring_security_filterchains onStart and onStop.
CustomWebFilter is disabled because reactive stream logging enabled in API, CustomWebFilter can be used to log API not having log enabled
CustomServerRequestObservationConvention customize http_server__requests status, outceme an exception values, status will be fixed in next springFW.
They are enabled using properties

logging.level.reactor.netty.http.server.HttpServerOperations=TRACE
example.CustomDefaultMeterObservationHandler=true
example.CustomWebFilter=false
example.CustomServerRequestObservationConvention=false

This result show random cancel() on request number 99 and another test slighly later in sample 761. I was using 10 thread, 100 request each and use console seach to find cancel

Extra cancel(), only reactor HttpServerOperations logging enabled.
2023-10-17T10:09:10.911+03:00  INFO 16708 --- [    parallel-12] reactor.Mono.Just.99                     : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2023-10-17T10:09:10.911+03:00  INFO 16708 --- [    parallel-12] reactor.Mono.Just.99                     : | request(unbounded)
2023-10-17T10:09:10.911+03:00  INFO 16708 --- [    parallel-12] reactor.Mono.Just.99                     : | onNext(monoCancel)
2023-10-17T10:09:10.911+03:00  INFO 16708 --- [    parallel-12] reactor.Mono.Just.99                     : | onComplete()
2023-10-17T10:09:10.917+03:00  INFO 16708 --- [ctor-http-nio-9] reactor.Mono.Just.99                     : | cancel()

In second test only 10x10 but had to run twice to get once cancel(). Extra onStart and onStop cannot be correlated and not show here, but when cancel() exists the STOPname= spring.security.filterchains is missing incicating that cancel() is not handled and because of that longTaskTimermetrics keep running forever

What is interesting here is that cancel() timestamp is 5-6ms after onComplete(), maybe the latencies trigger extra cancel() somehow.

Full log in attached file !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

2023-10-17T11:15:25.705+03:00  INFO 9724 --- [     parallel-8] reactor.Mono.Just.129                    : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2023-10-17T11:15:25.705+03:00  INFO 9724 --- [     parallel-8] reactor.Mono.Just.129                    : | request(unbounded)
2023-10-17T11:15:25.705+03:00  INFO 9724 --- [     parallel-8] reactor.Mono.Just.129                    : | onNext(monoCancel)
2023-10-17T11:15:25.705+03:00  INFO 9724 --- [     parallel-8] reactor.Mono.Just.129                    : | onComplete()
2023-10-17T11:15:25.711+03:00  INFO 9724 --- [ctor-http-nio-4] reactor.Mono.Just.129                    : | cancel()
java.lang.Exception
	at com.example.example.ExampleController.lambda$monoCancel$0(ExampleController.java:28)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:152)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:176)
	at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnCancel(MonoSingle.java:108)
	at reactor.core.publisher.Operators$MonoInnerProducerBase.cancel(Operators.java:2931)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.cancel(FluxDoFinally.java:134)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.cancel(FluxPeekFuseable.java:452)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.cancel(FluxPeekFuseable.java:452)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.cancel(FluxPeekFuseable.java:452)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.cancel(FluxDoOnEach.java:113)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.cancel(FluxPeekFuseable.java:798)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
	at reactor.core.publisher.Operators.terminate(Operators.java:1277)
	at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:481)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)

Enabling also logging.level.reactor.netty.channel.ChannelOperations=TRACE we can see different exception during the cancel
Full log in and picture in attachment

channelOperationExceptionWhenCancel
channelOperationsExceptionCancel.txt

2023-10-17T12:11:00.851+03:00  INFO 29380 --- [     parallel-4] reactor.Mono.Just.378                    : | onComplete()

java.lang.Exception: ChannelOperation terminal stack
	at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:477) ~[reactor-netty-core-1.1.11.jar:1.1.11]
	at io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:174) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:167) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

2023-10-17T12:11:00.855+03:00  INFO 29380 --- [ctor-http-nio-8] reactor.Mono.Just.378                    : | cancel()

How to repro external cancel():
Simply make single call to call "GET localhost:8081/monoCancelDelay" and stop request within 5s. Each stopped call result cancel() and spring_security_filterchains_active_seconds_active_count increase ny one
Can be also tested using api "GET localhost:8081/statusLocalDelay" which make calls webclient call to own status/health with delay allowing client to be stopped or making webclient call to external API using /status and having delay in response

    @GetMapping("monoCancelDelay")
    public Mono<String> monoCancelSleep(){
        return Mono.just("monoCancelDelay").delayElement(Duration.ofSeconds(5)).log().doOnCancel(() -> new Exception().printStackTrace());
    }


    @GetMapping("monoCancelNoLogDelay")
    public Mono<String> monoCancelnoLogSleep(){
        return Mono.just("monoCancelNoLogDelay").delayElement(Duration.ofSeconds(5)).doOnCancel(() -> new Exception().printStackTrace());
    }

Example app also contains extra features which were used during repro

  • netty server and client metrics (checks the real number of requests), enable ogs using logging.level.reactor.netty.http.server.HttpServerOperations=TRACE, logging.level.reactor.netty.channel.ChannelOperations=TRACE
  • CustomChannelId filter metrics (check the number filters called)
  • CustomWebFilter to log reactive stream (enable/disable using property example.CustomWebFilter=false/true)
  • CustomServerRequestObservationConvention which fixes the status #31388 and custom fix to exception and outcome, example.CustomServerRequestObservationConvention=false/false
  • CustomDefaultMeterObservationHandler to print filter onStart/onStop (enable/disable using property example.CustomDefaultMeterObservationHandler=false/true)
  • I did not get fix to #31417 working as my fix created additional DefaultMeterObservationHandler , but counted metrics did not include extra entries so apparently fix works.

There are also few other API

  • /monoCancelBasicAuth, with basic auth, but cancel() and active leak happen also without credentials in use
  • /fluxSuccess, issue not happening with Flux, unless external cancel
  • /fluxSuccessBasicAuth, flux with basic auth, no random cance()
  • /status, calls external rest http://localhost:11112/mx/status usingwebClient
  • /statusLocal, calls internal rest http://localhost:8081/mx/status usingwebClient
  • /statusFlux and /statusFluxLocal, use flux instead of Mono, no random cancel()
    Use mock server to external server, adding delay allow cancel while waiting for response.
logging.level.reactor.netty.http.server.HttpServerOperations=TRACE
logging.level.reactor.netty.channel.ChannelOperations=TRACE
example.CustomDefaultMeterObservationHandler=true
example.CustomWebFilter=false
example.CustomServerRequestObservationConvention=false

Additional test results:

TEST 1:
Calling GET localhost:8081/status, which call external service using webClient

2023-10-16T17:18:59.027+03:00 INFO 26052 --- [ parallel-11] reactor.Mono.Defer.1459 : onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
2023-10-16T17:18:59.027+03:00 INFO 26052 --- [ parallel-11] reactor.Mono.Defer.1459 : request(unbounded)
2023-10-16T17:18:59.033+03:00 INFO 26052 --- [ctor-http-nio-2] reactor.Mono.Defer.1459 : cancel()
2023-10-16T17:18:59.032+03:00 INFO 26052 --- [ parallel-11] reactor.Mono.Defer.1459 : onComplete()

TEST 2: cancel only, no onComplete. I cannot repro with example application, but easily repro with real service. Below extract from logs

2023-10-12T09:58:00.474+03:00 INFO [service-v3,,] 25340 --- [ctor-http-nio-3] reactor.Mono.Defer.11446 : onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
2023-10-12T09:58:00.474+03:00 INFO [service-v3,,] 25340 --- [ctor-http-nio-3] reactor.Mono.Defer.11446 : request(unbounded)
2023-10-12T09:58:00.630+03:00 INFO [service-v3,,] 25340 --- [ctor-http-nio-3] reactor.Mono.Defer.11446 : cancel()
java.lang.Exception
at com.example.example.ExampleController.lambda$monoCancel$0(ExampleController.java:27)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:152)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:176)
at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnCancel(MonoSingle.java:108)
at reactor.core.publisher.Operators$MonoInnerProducerBase.cancel(Operators.java:2931)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.cancel(FluxPeekFuseable.java:798)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.cancel(FluxDoFinally.java:134)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.cancel(FluxPeekFuseable.java:452)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.cancel(FluxPeekFuseable.java:452)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.cancel(FluxPeekFuseable.java:452)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.cancel(FluxDoOnEach.java:113)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.cancel(FluxPeekFuseable.java:798)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.Operators.terminate(Operators.java:1277)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:481)
at io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)

TEST 3: external Cancel(), GET localhost:8081/status and canceled during slow external webclient call

2023-10-16T17:16:29.989+03:00 ERROR 26052 --- [ctor-http-nio-6] e.e.CustomDefaultMeterObservationHandler : STARTname=http.server.requests
2023-10-16T17:16:30.080+03:00 INFO 26052 --- [ parallel-2] reactor.Mono.Defer.2 : onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
2023-10-16T17:16:30.080+03:00 INFO 26052 --- [ parallel-2] reactor.Mono.Defer.2 : request(unbounded)
2023-10-16T17:16:30.995+03:00 WARN 26052 --- [ctor-http-nio-6] reactor.netty.channel.FluxReceive : [22f5e991-1, L:/127.0.0.1:8081 - R:/127.0.0.1:34772] An exception has been observed post termination, use DEBUG level to see the full stack: java.net.SocketException: Connection reset
2023-10-16T17:16:30.995+03:00 ERROR 26052 --- [ctor-http-nio-6] e.e.CustomDefaultMeterObservationHandler : STOPname=http.server.requests
2023-10-16T17:16:30.996+03:00 ERROR 26052 --- [ctor-http-nio-6] e.e.CustomDefaultMeterObservationHandler : STARTname= spring.security.filterchains, spring.security.reached.filter.section=after
2023-10-16T17:16:30.996+03:00 INFO 26052 --- [ctor-http-nio-6] reactor.Mono.Defer.2 : cancel()
java.lang.Exception
at com.example.example.ExampleController.lambda$status$3(ExampleController.java:69)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:152)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:176)
at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnCancel(MonoSingle.java:108)
at reactor.core.publisher.Operators$MonoInnerProducerBase.cancel(Operators.java:2931)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.cancel(FluxPeekFuseable.java:798)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.cancel(FluxDoFinally.java:134)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.cancel(FluxPeekFuseable.java:452)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.cancel(FluxPeekFuseable.java:452)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:199)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.cancel(FluxPeekFuseable.java:452)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:207)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.cancel(FluxDoOnEach.java:113)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.cancel(FluxPeekFuseable.java:798)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2399)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2179)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.cancel(MonoIgnoreThen.java:143)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.Operators.terminate(Operators.java:1277)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:481)
at reactor.netty.http.server.HttpServerOperations.onInboundClose(HttpServerOperations.java:696)
at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:73)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at reactor.netty.http.server.AbstractHttpServerMetricsHandler.channelInactive(AbstractHttpServerMetricsHandler.java:107)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418)
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:411)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:376)
at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
at io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)

@jtorkkel jtorkkel added status/need-triage A new issue that still need to be evaluated as a whole type/bug A general bug labels Oct 17, 2023
@violetagg violetagg removed the status/need-triage A new issue that still need to be evaluated as a whole label Oct 23, 2023
@violetagg violetagg self-assigned this Oct 23, 2023
@violetagg
Copy link
Member

violetagg commented Oct 25, 2023

@jtorkkel I am able to reproduce what you are observing. The random cancel() is not random, it works as expected and as explained here spring-projects/spring-framework#31365 (comment) it may happened that cancel and onComplete can happen simultaneously.

Let me explain in details how Reactor Netty works:

From one side we have Publisher that publishes in a thread different than Netty event loops. On the other hand we have Netty events published always in event loops.

When Netty is able to send everything over the network it will notify Reactor Netty. This happens here:

//Try to defer the disposing to leave a chance for any synchronous complete following this callback
if (!ops.isSubscriptionDisposed()) {
ch.eventLoop()
.execute(((HttpServerOperations) ops)::terminate);
}
else {
//if already disposed, we can immediately call terminate
((HttpServerOperations) ops).terminate();
}

If onComplete was already received we invoke immediately ((HttpServerOperations) ops).terminate(); however if that's not the case, we will try to delay a bit the termination and in order to do this we schedule ((HttpServerOperations) ops).terminate(); as a task in the event loop (this is the stack that you are seeing as a random cancel()).
When time comes and this task is executed it may happen that cancel and onComplete may be invoked simultaneously.

I do not see any issue in this behaviour and the random cancel() is actually a deliberately scheduled termination.

Although cancel and onComplete happen simultaneously, Reactor Netty guarantees that the termination happens just once.

Now about the 5ms that you are seeing between onComplete and cancel. I changed a bit your example.

Instead of:

Mono.just("monoCancel").log().doOnCancel(() -> new Exception().printStackTrace())

I'm invoking a logger

Mono.just("monoCancel").log().doOnCancel(() -> logger.trace("", new Exception()))

With this change I can see clearly that the delay is not between onComplete and cancel, but rather between doOnCancel and cancel.

2023-10-25T11:22:13.480+03:00  INFO 28142 --- [    parallel-10] reactor.Mono.Just.641                    : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2023-10-25T11:22:13.480+03:00  INFO 28142 --- [    parallel-10] reactor.Mono.Just.641                    : | request(unbounded)
2023-10-25T11:22:13.480+03:00  INFO 28142 --- [    parallel-10] reactor.Mono.Just.641                    : | onNext(monoCancel)
2023-10-25T11:22:13.480+03:00  INFO 28142 --- [    parallel-10] reactor.Mono.Just.641                    : | onComplete()
2023-10-25T11:22:13.481+03:00 TRACE 28142 --- [ctor-http-nio-2] reactor.netty.gh2936                     : isSubscriptionDisposed()=false
2023-10-25T11:22:13.481+03:00 TRACE 28142 --- [ctor-http-nio-2] com.example.example.ExampleController    : 

java.lang.Exception: null
	at com.example.example.ExampleController.lambda$monoCancel$0(ExampleController.java:31) ~[classes!/:0.0.1-SNAPSHOT]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:152) ~[reactor-core-3.5.12-SNAPSHOT.jar!/:3.5.12-SNAPSHOT]
2023-10-25T11:22:13.495+03:00  INFO 28142 --- [ctor-http-nio-2] reactor.Mono.Just.641                    : | cancel()

I believe that this delay is caused by the logging itself.

I did the following experiment. I did this change in addition to the change with the logging above:

    public ExampleController(WebClient webClient) {
        this.webClient = webClient;
        logger.trace("", new Exception());
    }

Now I do not see anymore delays between doOnCancel and cancel, basically I warmed up my application.

@violetagg violetagg added status/invalid We don't feel this issue is valid for/user-attention This issue needs user attention (feedback, rework, etc...) and removed type/bug A general bug labels Oct 25, 2023
@jtorkkel
Copy link
Author

Thanks,

I can confirm that after fixing this #31417 commit da95542 the http.server.requests counts are accurate.

It means that spring security observation #14031 needs to handle cancel() and prevent duplicates by implementing something similar done in #31417 commit da95542 which prevent cancel() processing after onComplete()

@violetagg
Copy link
Member

Thanks,

I can confirm that after fixing this #31417 commit da95542 the http.server.requests counts are accurate.

It means that spring security observation #14031 needs to handle cancel() and prevent duplicates by implementing something similar done in #31417 commit da95542 which prevent cancel() processing after onComplete()

yes

Let me close this issue. If needed we can reopen it.

@violetagg violetagg removed the for/user-attention This issue needs user attention (feedback, rework, etc...) label Oct 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/invalid We don't feel this issue is valid
Projects
None yet
Development

No branches or pull requests

2 participants