You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Exception:java.lang.IllegalStateException: Only one connection receive subscriber allowed.
The above error should not be thrown when using WebClient.
Actual Behavior
java.lang.IllegalStateException: Only one connection receive subscriber allowed.
at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:182)
at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:143)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:544)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:334)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onError(MonoCollect.java:144)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:165)
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:167)
at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:143)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onNext(FluxDoOnEach.java:173)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:189)
at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:165)
at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:414)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:671)
at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:183)
at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:439)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:637)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Sometimes, the above error is thrown when using WebClient.
Steps to Reproduce
It only occurs in the production environment where the application receives random connections from the internet.
I couldn't reproduce it in a test code.
But, as I wrote below it is thrown if and only if a connection reset is occured.
Possible Solution
When reactor.netty.http.clientHttpClientConfig is initialized, retryDisabled should be set to be true by default.
Comment
As I investigate the issue in this article,
I found that it is thrown if and only if a connection reset is occurred and the first try of the chain is failed.
Because FluxReceive::startReceiver uses if (... ONCE.compareAndSet(this, 0, 1)) expression in the conditional, FluxReceive::startReceiver should not be called more than once.
But there is a retryWhen method and uses it in HttpClientConnect.MonoHttpConnect::subscribe.
I could bypass the issue by calling the method HttpClient::retryDisabled to be true,
but I think there is a bug when retryDisabled is set to be false(it is default.), or shouldRetry to be true.
@trouvaillle You should first update your Reactor Netty version.
Can you provide a reproducible example that shows that exactly retry functionality causes this behaviour? Retry functionality is implemented in a way that it always does a new request and does not subscribe again to the one that failed.
We know that there is an issue in Spring Framework that causes double subscription spring-projects/spring-framework#26699 as you did not provide the full stack trace please check whether the stack trace is the same.
Expected Behavior
Exception:java.lang.IllegalStateException: Only one connection receive subscriber allowed.
The above error should not be thrown when using
WebClient
.Actual Behavior
Sometimes, the above error is thrown when using
WebClient
.Steps to Reproduce
It only occurs in the production environment where the application receives random connections from the internet.
I couldn't reproduce it in a test code.
But, as I wrote below it is thrown if and only if a connection reset is occured.
Possible Solution
When
reactor.netty.http.clientHttpClientConfig
is initialized,retryDisabled
should be set to betrue
by default.Comment
As I investigate the issue in this article,
I found that it is thrown if and only if a connection reset is occurred and the first try of the chain is failed.
Because
FluxReceive::startReceiver
usesif (... ONCE.compareAndSet(this, 0, 1))
expression in the conditional,FluxReceive::startReceiver
should not be called more than once.But there is a
retryWhen
method and uses it inHttpClientConnect.MonoHttpConnect::subscribe
.I could bypass the issue by calling the method
HttpClient::retryDisabled
to be true,but I think there is a bug when
retryDisabled
is set to befalse
(it is default.), orshouldRetry
to betrue
.Your Environment
io.projectreactor.netty:reactor-netty-http:1.0.14
netty
, ...):io.projectreactor.netty:reactor-netty-core:1.0.14
java -version
): openjdk version "1.8.0_312"uname -a
): Darwin Kernel Version 21.6.0: Mon Aug 22 20:19:52 PDT 2022; root:xnu-8020.140.49~2/RELEASE_ARM64_T6000 arm64The text was updated successfully, but these errors were encountered: