Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SpringBoot WebClient memory leak when the response is not consumed in the filter #1603

Closed
gdinant opened this issue Apr 14, 2021 · 13 comments
Closed
Labels
status/invalid We don't feel this issue is valid

Comments

@gdinant
Copy link

gdinant commented Apr 14, 2021

Hello everyone,

I'm running into a LEAK with WebClient. However I'm not able to reproduce the LEAK at all. The ERROR log is happening now and then in all live environments (roughly every 2-3 days).

I have read most of the other topics that got raised on the subject, but nothing seems to ressemble to my use case.

I'd be really grateful if anyone could help me out on this.

Expected Behavior

No LEAK

Actual Behavior

Standard stacktrace

LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:385)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
	io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
	io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:2212)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1382)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)

In "advanced" logging mode:

LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:285)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1533)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)
#2:
	io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:106)
	io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:376)
	io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:225)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1533)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)
#3:
	io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
	io.netty.handler.codec.http.HttpObjectDecoder$HeaderParser.parse(HttpObjectDecoder.java:940)
	io.netty.handler.codec.http.HttpObjectDecoder.readHeaders(HttpObjectDecoder.java:616)
	io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:258)
	io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:225)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1533)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)
#4:
	io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
	io.netty.handler.codec.http.HttpObjectDecoder$HeaderParser.parse(HttpObjectDecoder.java:940)
	io.netty.handler.codec.http.HttpObjectDecoder$LineParser.parse(HttpObjectDecoder.java:997)
	io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:239)
	io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:225)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1533)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)
#5:
	Hint: 'reactor.left.httpCodec' will handle the message from this point.
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1533)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)
#6:
	io.netty.buffer.AdvancedLeakAwareByteBuf.internalNioBuffer(AdvancedLeakAwareByteBuf.java:736)
	io.netty.handler.ssl.SslHandler.toByteBuffer(SslHandler.java:1543)
	io.netty.handler.ssl.SslHandler.access$300(SslHandler.java:167)
	io.netty.handler.ssl.SslHandler$SslEngineType$3.unwrap(SslHandler.java:283)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1387)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)
#7:
	io.netty.buffer.AdvancedLeakAwareByteBuf.nioBufferCount(AdvancedLeakAwareByteBuf.java:706)
	io.netty.handler.ssl.SslHandler.toByteBuffer(SslHandler.java:1543)
	io.netty.handler.ssl.SslHandler.access$300(SslHandler.java:167)
	io.netty.handler.ssl.SslHandler$SslEngineType$3.unwrap(SslHandler.java:283)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1387)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:385)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
	io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
	io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:2212)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1382)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)
: 2 leak records were discarded because they were duplicates
: 22 leak records were discarded because the leak record count is targeted to 4. Use system property io.netty.leakDetection.targetRecords to increase the limit.

Client configuration

There is only one client in this component configured as such:

ClientContext snippet:

private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(1);
private static final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(30);

@Bean
public ClientHttpConnector clientHttpConnector() {
	return new ReactorClientHttpConnector(HttpClient.create()
		.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) DEFAULT_CONNECTION_TIMEOUT.toMillis())
		.responseTimeout(DEFAULT_READ_TIMEOUT));
}

@Bean
public ICreditClient creditClient() {
	var componentUrl = componentClientConfiguration().getCreditUrl().toString();
	return new CreditClient(componentWebClientBuilder().baseUrl(componentUrl).build());
}

private WebClient.Builder componentWebClientBuilder() {

	return webClientBuilder.clone()
		.filter(renewTokenFilter())
		.filter(tokenFilter())
		.filter(new CorrelationIdHeaderFilter(configuration.getCorrelationIdSupplier()).filter())
		.filter(new PeerIdHeaderFilter(configuration.getPeerIdSupplier()).filter())
		.filter(new LoggingFilter().filter())
		.clientConnector(clientHttpConnector());
}

And used as follow:

CreditClient snippet:

@Override
@CircuitBreaker(name = "credit-client-save-credit")
@TimeLimiter(name = "credit-client-save-credit")
public Mono<CustomerCreditId> saveCredit(CustomerCreditCreation customerCreditCreation) {

	return webClient.post()
		.uri("/credit/v1/customer-credits")
		.bodyValue(customerCreditCreation)
		.retrieve()
		.bodyToMono(CustomerCreditId.class);
}

Side notes

I thought that it might be induced by the Timelimiter, but I couldn't manage to reproduce the leak either. The connection seems to close itself correctly too.

CreditClientTest snippet:

@Slf4j
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class CreditClientTest {

	@Autowired
	private ICreditClient client;

	@Autowired
	ObjectMapper objectMapper;

	@RegisterExtension
	WireMockExtension wireMockExtension = new WireMockExtension();

	@Test
	void testSaveCredit() throws Exception {
		var credit = CustomerCreditCreation.builder().build();
		var creditId = CustomerCreditId.builder().customerId(1).creditId(1).build();

		stubFor(post(urlEqualTo("/credit/v1/customer-credits")).withRequestBody(
			WireMock.equalToJson(objectMapper.writeValueAsString(credit)))
			.willReturn(aResponse().withFixedDelay(1000)
				.withStatus(HttpStatus.OK.value())
				.withHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
				.withBody(objectMapper.writeValueAsString(creditId))));

		try {
			var result = client.saveCredit(credit).block();
		} catch (Exception e) {
			log.info("No fallback", e);
		}
	}

	@SpringBootConfiguration
	@EnableAspectJAutoProxy
	@Import(value = { CircuitBreakerAutoConfiguration.class, RetryAutoConfiguration.class,
		TimeLimiterAutoConfiguration.class
	})
	public static class TestContext {

		@Bean
		public ObjectMapper objectMapper() {
			return new ObjectMapper();
		}

		@Autowired
		private TimeLimiterRegistry timeLimiterRegistry;

		@Bean
		public TimeLimiter creditClientTimeLimiter() {

			var config =
				TimeLimiterConfig.custom().timeoutDuration(Duration.ofMillis(250)).build();

			return timeLimiterRegistry.timeLimiter("credit-client-save-credit", config);
		}

		@Bean
		public CreditClient creditClient() {
			return new CreditClient(WebClient.builder().baseUrl("http://localhost:" + WireMockExtension.PORT).build());
		}
	}

}
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.4)

2021-04-14 15:38:03.258  INFO 48145 --- [           main] c.l.c.client.credit.CreditClientTest     : Starting CreditClientTest using Java 11.0.9.1 on MACC02ZL3G9LVDQ with PID 48145 (started by gabriel.dinant in /Users/gabriel.dinant/Documents/repositories/leshop/retention/cumulusvoucher/service)
2021-04-14 15:38:03.259 DEBUG 48145 --- [           main] c.l.c.client.credit.CreditClientTest     : Running with Spring Boot v2.4.4, Spring v5.3.5
2021-04-14 15:38:03.259  INFO 48145 --- [           main] c.l.c.client.credit.CreditClientTest     : The following profiles are active: test
2021-04-14 15:38:04.115 DEBUG 48145 --- [           main] reactor.netty.tcp.TcpResources           : [http] resources will use the default LoopResources: DefaultLoopResources {prefix=reactor-http, daemon=true, selectCount=12, workerCount=12}
2021-04-14 15:38:04.115 DEBUG 48145 --- [           main] reactor.netty.tcp.TcpResources           : [http] resources will use the default ConnectionProvider: reactor.netty.resources.DefaultPooledConnectionProvider@5d66e944
2021-04-14 15:38:04.267  INFO 48145 --- [           main] c.l.c.client.credit.CreditClientTest     : Started CreditClientTest in 1.294 seconds (JVM running for 2.622)
2021-04-14 15:38:04.511  INFO 48145 --- [           main] org.eclipse.jetty.server.Server          : jetty-9.4.38.v20210224; built: 2021-02-24T20:25:07.675Z; git: 288f3cc74549e8a913bf363250b0744f2695b8e6; jvm 11.0.9.1+1
2021-04-14 15:38:04.525  INFO 48145 --- [           main] o.e.jetty.server.handler.ContextHandler  : Started o.e.j.s.ServletContextHandler@42d236fb{/__admin,null,AVAILABLE}
2021-04-14 15:38:04.527  INFO 48145 --- [           main] o.e.jetty.server.handler.ContextHandler  : Started o.e.j.s.ServletContextHandler@ba1f559{/,null,AVAILABLE}
2021-04-14 15:38:04.547  INFO 48145 --- [           main] o.e.jetty.server.AbstractConnector       : Started NetworkTrafficServerConnector@1da6ee17{HTTP/1.1, (http/1.1)}{0.0.0.0:9095}
2021-04-14 15:38:04.548  INFO 48145 --- [           main] org.eclipse.jetty.server.Server          : Started @2903ms
2021-04-14 15:38:04.748  INFO 48145 --- [qtp305552520-24] o.e.j.s.handler.ContextHandler.__admin   : RequestHandlerClass from context returned com.github.tomakehurst.wiremock.http.AdminRequestHandler. Normalized mapped under returned 'null'
2021-04-14 15:38:04.982 DEBUG 48145 --- [           main] r.netty.resources.DefaultLoopIOUring     : Default io_uring support : false
2021-04-14 15:38:04.986 DEBUG 48145 --- [           main] r.netty.resources.DefaultLoopEpoll       : Default Epoll support : false
2021-04-14 15:38:04.986 DEBUG 48145 --- [           main] r.netty.resources.DefaultLoopKQueue      : Default KQueue support : false
2021-04-14 15:38:05.512 DEBUG 48145 --- [           main] r.n.resources.PooledConnectionProvider   : Creating a new [http] client pool [PoolFactory{evictionInterval=PT0S, leasingStrategy=fifo, maxConnections=500, maxIdleTime=-1, maxLifeTime=-1, metricsEnabled=false, pendingAcquireMaxCount=1000, pendingAcquireTimeout=45000}] for [localhost:9095]
2021-04-14 15:38:05.620 DEBUG 48145 --- [ctor-http-nio-2] r.n.r.DefaultPooledConnectionProvider    : [id:12019d3e] Created a new pooled channel, now: 1 active connections, 0 inactive connections and 0 pending acquire requests.
2021-04-14 15:38:05.620  INFO 48145 --- [           main] c.l.c.client.credit.CreditClientTest     : No fallback

reactor.core.Exceptions$ReactiveException: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 250ms in 'flatMap' (and no fallback has been configured)
	at reactor.core.Exceptions.propagate(Exceptions.java:392) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:97) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.Mono.block(Mono.java:1703) ~[reactor-core-3.4.4.jar:3.4.4]
	at com.leshop.cumulusvoucher.client.credit.CreditClientTest.testSaveCredit(CreditClientTest.java:77) ~[test-classes/:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) ~[junit-platform-commons-1.7.1.jar:1.7.1]
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) ~[na:na]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) ~[na:na]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) ~[junit-platform-engine-1.7.1.jar:1.7.1]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71) ~[junit5-rt.jar:na]
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) ~[junit-rt.jar:na]
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220) ~[junit-rt.jar:na]
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53) ~[junit-rt.jar:na]
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99) ~[reactor-core-3.4.4.jar:3.4.4]
		... 67 common frames omitted
Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 250ms in 'flatMap' (and no fallback has been configured)
	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:294) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:279) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:418) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:119) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.4.jar:3.4.4]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2021-04-14 15:38:05.664 DEBUG 48145 --- [ctor-http-nio-2] reactor.netty.transport.TransportConfig  : [id:12019d3e] Initialized pipeline DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.left.httpDecompressor = io.netty.handler.codec.http.HttpContentDecompressor), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-04-14 15:38:05.698 DEBUG 48145 --- [ctor-http-nio-2] r.n.r.DefaultPooledConnectionProvider    : [id:12019d3e, L:/127.0.0.1:61436 - R:localhost/127.0.0.1:9095] Registering pool release on close event for channel
2021-04-14 15:38:05.699 DEBUG 48145 --- [ctor-http-nio-2] r.n.r.DefaultPooledConnectionProvider    : [id:12019d3e, L:/127.0.0.1:61436 - R:localhost/127.0.0.1:9095] Channel connected, now: 1 active connections, 0 inactive connections and 0 pending acquire requests.
2021-04-14 15:38:05.700 DEBUG 48145 --- [ctor-http-nio-2] r.n.r.DefaultPooledConnectionProvider    : [id:12019d3e, L:/127.0.0.1:61436 - R:localhost/127.0.0.1:9095] onStateChange(PooledConnection{channel=[id: 0x12019d3e, L:/127.0.0.1:61436 - R:localhost/127.0.0.1:9095]}, [connected])
2021-04-14 15:38:05.710 DEBUG 48145 --- [ctor-http-nio-2] r.n.r.DefaultPooledConnectionProvider    : [id:12019d3e-1, L:/127.0.0.1:61436 - R:localhost/127.0.0.1:9095] onStateChange(GET{uri=/, connection=PooledConnection{channel=[id: 0x12019d3e, L:/127.0.0.1:61436 - R:localhost/127.0.0.1:9095]}}, [configured])
2021-04-14 15:38:05.719 DEBUG 48145 --- [ctor-http-nio-2] r.n.r.DefaultPooledConnectionProvider    : [id:12019d3e-1, L:/127.0.0.1:61436 ! R:localhost/127.0.0.1:9095] Channel closed, now: 0 active connections, 0 inactive connections and 0 pending acquire requests.
2021-04-14 15:38:05.720 DEBUG 48145 --- [ctor-http-nio-2] r.netty.http.client.HttpClientConnect    : [id:12019d3e-1, L:/127.0.0.1:61436 ! R:localhost/127.0.0.1:9095] Handler is being applied: {uri=http://localhost:9095/credit/v1/customer-credits, method=POST}
2021-04-14 15:38:05.723 DEBUG 48145 --- [ctor-http-nio-2] r.n.r.DefaultPooledConnectionProvider    : [id:12019d3e-1, L:/127.0.0.1:61436 ! R:localhost/127.0.0.1:9095] onStateChange(POST{uri=/credit/v1/customer-credits, connection=PooledConnection{channel=[id: 0x12019d3e, L:/127.0.0.1:61436 ! R:localhost/127.0.0.1:9095]}}, [request_prepared])
2021-04-14 15:38:05.743 DEBUG 48145 --- [ctor-http-nio-2] r.n.r.DefaultPooledConnectionProvider    : [id:12019d3e-1, L:/127.0.0.1:61436 ! R:localhost/127.0.0.1:9095] onStateChange(POST{uri=/credit/v1/customer-credits, connection=PooledConnection{channel=[id: 0x12019d3e, L:/127.0.0.1:61436 ! R:localhost/127.0.0.1:9095]}}, [disconnecting])
2021-04-14 15:38:06.634  INFO 48145 --- [           main] o.e.jetty.server.AbstractConnector       : Stopped NetworkTrafficServerConnector@1da6ee17{HTTP/1.1, (http/1.1)}{0.0.0.0:9095}
2021-04-14 15:38:06.635  INFO 48145 --- [           main] o.e.jetty.server.handler.ContextHandler  : Stopped o.e.j.s.ServletContextHandler@ba1f559{/,null,STOPPED}
2021-04-14 15:38:06.635  INFO 48145 --- [           main] o.e.jetty.server.handler.ContextHandler  : Stopped o.e.j.s.ServletContextHandler@42d236fb{/__admin,null,STOPPED}
  • Reactor version(s) used:

Reactor-netty: 1.0.5 (spring-boot-starter-webflux)

  • Other relevant libraries versions (eg. netty, ...):

Springboot: 2.4.4
Spring: 5.3.5
Resilence4j: 1.7.0

  • JVM version (java -version):

openjdk version "11.0.9.1" 2020-11-04
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.9.1+1)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.9.1+1, mixed mode)

  • OS and version (eg. uname -a):

Linux cumulusvoucher-34-d4gwj 3.10.0-1062.12.1.el7.x86_64 #1 SMP Thu Dec 12 06:44:49 EST 2019 x86_64 x86_64 x86_64 GNU/Linux

@gdinant gdinant added status/need-triage A new issue that still need to be evaluated as a whole type/bug A general bug labels Apr 14, 2021
@violetagg
Copy link
Member

@gdinant In that memory leak stack I do not see any reactor.netty package. Is this the only stack with memory leak that you have?

@gdinant
Copy link
Author

gdinant commented Apr 14, 2021

Oops, I surely got confused between reactor.netty and io.netty while googling for this specific exception 😢 which (always) led me here.

I assume I can just reopen this case in netty's github. Sorry @violetagg!

@violetagg
Copy link
Member

@gdinant I'm not saying that the issue is/is not in Rector Netty, just I do not see any reactor.netty in the stack.
Is this the only stack with memory leak that you have?

@violetagg
Copy link
Member

@gdinant One additional question: You enabled -Dio.netty.leakDetection.level=paranoid and in application.properties you added logging.level._reactor.netty.channel.LeakDetection=debug, is that correct?

@gdinant
Copy link
Author

gdinant commented Apr 14, 2021

I'm not saying that the issue is/is not in Rector Netty, just I do not see any reactor.netty in the stack.
Is this the only stack with memory leak that you have?

Yes it is the only stack I can get (so far). Will post more if possible (and necessary).

One additional question: You enabled -Dio.netty.leakDetection.level=paranoid and in application.properties you added logging.level._reactor.netty.channel.LeakDetection=debug, is that correct?

Yes for the paranoid option. log level I set logging.level.reactor.netty=DEBUG.

I've just tried with your configuration instead, but I don't get anything more.

@gdinant
Copy link
Author

gdinant commented Apr 15, 2021

Another piece of information (might help too). Regarding the filters applied:

private WebClient.Builder componentWebClientBuilder() {

	return webClientBuilder.clone()
		.filter(renewTokenFilter())
		.filter(tokenFilter())
		.filter(new CorrelationIdHeaderFilter(configuration.getCorrelationIdSupplier()).filter())
		.filter(new PeerIdHeaderFilter(configuration.getPeerIdSupplier()).filter())
		.filter(new LoggingFilter().filter())
		.clientConnector(clientHttpConnector());
}

They're all pretty dummy apart from token related filters which are making a downstream call in case of a 401 (token expiry).

Here's an example of the implementation:

public ExchangeFilterFunction tokenFilter() {

	return ExchangeFilterFunction.ofRequestProcessor(req -> {
		String token = tokenHolder.token();
		return Mono.just(ClientRequest.from(req).header(ITokenHandler.TOKEN_HTTP_HEADER, token).build());
	});
}

public ExchangeFilterFunction renewTokenFilter() {

	return (req, next) -> next.exchange(req).flatMap((Function<ClientResponse, Mono<ClientResponse>>) res -> {
		if (res.statusCode().value() == HttpStatus.UNAUTHORIZED.value()) {
			tokenHolder.renewToken();
			return next.exchange(req);
		} else {
			return Mono.just(res);
		}
	});
}

I thought that maybe having an exception on the line tokenHolder.renewToken(); might create a LEAK? However I couldn't reproduce the LEAK either. In a UT everything's seems to close as expected despite the exception. Production logs don't back up that hypothesis either since I couldn't find any exception trace (but I don't exclude the fact that they could be swallowed somehow).

@violetagg
Copy link
Member

@gdinant So in case of HttpStatus.UNAUTHORIZED.value() you do not consume the incoming body, is that correct?

Is it possible instead of exchange() to call exchangeToMono() or exchangeToFlux()?
As opposed to exchange(), exchangeToMono()/exchangeToFlux() take care for releasing the memory.

https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-client-exchange

@gdinant
Copy link
Author

gdinant commented Apr 15, 2021

@violetagg you're right, and I'm aware of this pitfall regarding .exchange() on WebClient.

However in this filter ExchangeFunction exposes only an exchange action.

@FunctionalInterface
public interface ExchangeFunction {

	/**
	 * Exchange the given request for a {@link ClientResponse} promise.
	 * @param request the request to exchange
	 * @return the delayed response
	 */
	Mono<ClientResponse> exchange(ClientRequest request);

	/**
	 * Filter the exchange function with the given {@code ExchangeFilterFunction},
	 * resulting in a filtered {@code ExchangeFunction}.
	 * @param filter the filter to apply to this exchange
	 * @return the filtered exchange
	 * @see ExchangeFilterFunction#apply(ExchangeFunction)
	 */
	default ExchangeFunction filter(ExchangeFilterFunction filter) {
		return filter.apply(this);
	}

}

Perhaps I still need to consume somehow the body indeed :).

@violetagg
Copy link
Member

@rstoyanchev Can you take a look?

@violetagg violetagg removed the status/need-triage A new issue that still need to be evaluated as a whole label Apr 16, 2021
@rstoyanchev
Copy link
Contributor

rstoyanchev commented Apr 16, 2021

WebClient is a thin facade around a chain of filters with an ExchangeFunction at the end to call the ClientHttpConnector and make the request. Something like this:

WebClient request building -> [filter1 -> filter2 -> ... -> ExchangeFunction/ClientHttpConnector] -> WebClient response handling

The exchange method in an ExchangeFilterFunction is indeed not deprecated, but when you operate on that level, specifically when you do some handling in the after phase, you are effectively taking over the response. Consider that WebClient is waiting for a Mono<ClientResponse> and if that never delivers but errors instead, it prevents WebClient from even seeing that there was a response.

Handle exceptions very carefully at that level and consume the response or call response.releaseBody().

@gdinant
Copy link
Author

gdinant commented Apr 19, 2021

I eventually managed to reproduce the LEAK. Just needed to run the test for a longer time with more hits.

The leak seems to disappear by adding the following line "consuming" the response body:

public ExchangeFilterFunction renewTokenFilter() {

	return (req, next) -> next.exchange(req).flatMap((Function<ClientResponse, Mono<ClientResponse>>) res -> {
		if (res.statusCode().value() == HttpStatus.UNAUTHORIZED.value()) {
			res.releaseBody().subscribe(); // LINE ADDED
			tokenHolder.renewToken();
			return next.exchange(req);
		} else {
			return Mono.just(res);
		}
	});
}

However calling "blocking" functions inside of non-blocking code isn't recommended. If I were to replace .subscribe() by .blocked() it would obviously fail.

@rstoyanchev would you see another workaround for doing so? or is it the right approach?

-- EDIT

I rewrited the function as follow which seems to be slightly better and get rid of the LEAK as well :

public ExchangeFilterFunction renewTokenFilter() {

	return (req, next) -> next.exchange(req).flatMap((Function<ClientResponse, Mono<ClientResponse>>) res -> {
		if (res.statusCode().value() == HttpStatus.UNAUTHORIZED.value()) {
			return res.releaseBody().thenReturn(tokenHolder.renewToken()).flatMap(t -> next.exchange(req));
		} else {
			return Mono.just(res);
		}
	});
}

Tell me if you see anything wrong :)

@rstoyanchev
Copy link
Contributor

That looks good @gdinant

@violetagg violetagg removed the type/bug A general bug label Apr 21, 2021
@gdinant
Copy link
Author

gdinant commented Apr 21, 2021

Perfect then, closing the issue ;-).

Thanks for your help!

@gdinant gdinant closed this as completed Apr 21, 2021
@violetagg violetagg added status/invalid We don't feel this issue is valid and removed for/user-attention This issue needs user attention (feedback, rework, etc...) labels Apr 21, 2021
@violetagg violetagg changed the title SpringBoot WebClient: LEAK: ByteBuf.release() was not called before it's garbage-collected SpringBoot WebClient memory leak when the response is not consumed in the filter Apr 21, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/invalid We don't feel this issue is valid
Projects
None yet
Development

No branches or pull requests

3 participants