-
Notifications
You must be signed in to change notification settings - Fork 357
Description
Folks, I use RSocket to transfer lots of data, so would like to achieve zero copy as much as possible. For that reason, I used ByteBufPayload. After upgrading to 0.11.13 from 0.11.3, however, I started seeing refCnt related exception thrown from Netty ByteBuf. Seeking for guidance how ByteBufPayload shoul be used.
Here is my code:
// Initialize rsocket client with ByteBufPayload decoder
RSocketFactory.connect()
.setupPayload(...)
.frameDecoder(ByteBufPayload::create)
...
try {
Payload request = prepareRequest();
AtomicBoolean first = new AtomicBoolean(true);
return socket.getRSocketMono()
.flux()
.flatMap(s -> s.requestStream(request))
.timeout(Mono.delay(requestTimeout))
.map(
payload -> {
if (first.getAndSet(false)) {
// read from first payload
byte[] buf = new byte[16];
payload.sliceData().readBytes(buf, 0, 16)
}
return payload;
})
.skip(1)
.map(
payload -> {
// read from payload and return an object that references the ByteBuf that payload references
byte[] buf = new byte[16];
payload.sliceData().readBytes(buf, 0, 16);
return new String(buf) == "foo" ? new MyObject(payload.sliceData().retain()) : new MyObject();
})
.limitRate(rate)
.doOnError(...)
.doFinally(...);
} catch (Exception ex) {
return Flux.error(ex);
}
With 0.11.13, I got leak in netty presumably because of #524, (@mostroverkhov, @rdegnan, @robertroeser) which calls retain() when creating ByteBufPayload. I fixed by adding release:
payload -> {
try {
byte[] buf = new byte[16];
payload.sliceData().readBytes(buf, 0, 16);
return new String(buf) == "foo" ? new MyObject(payload.sliceData().retain()) : new MyObject();
} finally {
payload.release();
}
}
That fixed the leak, but now I sometimes got exception when doing payload.sliceData().readBytes():
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1417)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1403)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1392)
at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:872)
So how shall I use ByteBufPayload?
After digging more into rsocket code, I'm also concerned about ByteBufPayload not being released if there is exception happen after ByteBufPayload::create, but before it enters FluxMap where I can call release()
Thanks