diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java index 8b65ee63c..e178bc971 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java @@ -17,7 +17,7 @@ package io.rsocket.fragmentation; import static io.rsocket.framing.PayloadFrame.createPayloadFrame; -import static io.rsocket.util.DisposableUtil.disposeQuietly; +import static io.rsocket.util.DisposableUtils.disposeQuietly; import static java.lang.Math.min; import io.netty.buffer.ByteBuf; diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java index 7b4e08232..ba1886bcf 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java @@ -16,7 +16,7 @@ package io.rsocket.fragmentation; -import static io.rsocket.util.DisposableUtil.disposeQuietly; +import static io.rsocket.util.DisposableUtils.disposeQuietly; import static io.rsocket.util.RecyclerFactory.createRecycler; import io.netty.buffer.ByteBuf; diff --git a/rsocket-core/src/main/java/io/rsocket/framing/AbstractRecyclableFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/AbstractRecyclableFrame.java index 5eb369bd9..3cd56f93d 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/AbstractRecyclableFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/AbstractRecyclableFrame.java @@ -16,12 +16,14 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static java.nio.charset.StandardCharsets.UTF_8; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.util.Recycler.Handle; +import io.netty.util.ReferenceCounted; import java.util.Objects; import reactor.util.annotation.Nullable; @@ -51,7 +53,7 @@ abstract class AbstractRecyclableFrame getMetadataLength() { * if you store it. * * @return the metadata directly, or {@code null} if the Metadata flag is not set + * @see #getMetadataAsUtf8() * @see #mapMetadata(Function) */ @Nullable diff --git a/rsocket-core/src/main/java/io/rsocket/framing/MetadataPushFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/MetadataPushFrame.java index 8b982455d..2490fe84c 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/MetadataPushFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/MetadataPushFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.framing.FrameType.METADATA_PUSH; import static io.rsocket.util.RecyclerFactory.createRecycler; @@ -69,8 +70,13 @@ public static MetadataPushFrame createMetadataPushFrame(ByteBuf byteBuf) { public static MetadataPushFrame createMetadataPushFrame( ByteBufAllocator byteBufAllocator, String metadata) { - return createMetadataPushFrame( - byteBufAllocator, getUtf8AsByteBufRequired(metadata, "metadata must not be null")); + ByteBuf metadataByteBuf = getUtf8AsByteBufRequired(metadata, "metadata must not be null"); + + try { + return createMetadataPushFrame(byteBufAllocator, metadataByteBuf); + } finally { + release(metadataByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/PayloadFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/PayloadFrame.java index 50b6e42b2..c91777df6 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/PayloadFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/PayloadFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.util.RecyclerFactory.createRecycler; import io.netty.buffer.ByteBuf; @@ -78,8 +79,15 @@ public static PayloadFrame createPayloadFrame( @Nullable String metadata, @Nullable String data) { - return createPayloadFrame( - byteBufAllocator, follows, complete, getUtf8AsByteBuf(metadata), getUtf8AsByteBuf(data)); + ByteBuf metadataByteBuf = getUtf8AsByteBuf(metadata); + ByteBuf dataByteBuf = getUtf8AsByteBuf(data); + + try { + return createPayloadFrame(byteBufAllocator, follows, complete, metadataByteBuf, dataByteBuf); + } finally { + release(metadataByteBuf); + release(dataByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/RequestChannelFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/RequestChannelFrame.java index f1e74bd62..95b5def86 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/RequestChannelFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/RequestChannelFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.util.RecyclerFactory.createRecycler; import io.netty.buffer.ByteBuf; @@ -83,13 +84,16 @@ public static RequestChannelFrame createRequestChannelFrame( @Nullable String metadata, @Nullable String data) { - return createRequestChannelFrame( - byteBufAllocator, - follows, - complete, - initialRequestN, - getUtf8AsByteBuf(metadata), - getUtf8AsByteBuf(data)); + ByteBuf metadataByteBuf = getUtf8AsByteBuf(metadata); + ByteBuf dataByteBuf = getUtf8AsByteBuf(data); + + try { + return createRequestChannelFrame( + byteBufAllocator, follows, complete, initialRequestN, metadataByteBuf, dataByteBuf); + } finally { + release(metadataByteBuf); + release(dataByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/RequestFireAndForgetFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/RequestFireAndForgetFrame.java index 3749ee329..2f4dbe978 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/RequestFireAndForgetFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/RequestFireAndForgetFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.util.RecyclerFactory.createRecycler; import io.netty.buffer.ByteBuf; @@ -74,8 +75,16 @@ public static RequestFireAndForgetFrame createRequestFireAndForgetFrame( @Nullable String metadata, @Nullable String data) { - return createRequestFireAndForgetFrame( - byteBufAllocator, follows, getUtf8AsByteBuf(metadata), getUtf8AsByteBuf(data)); + ByteBuf metadataByteBuf = getUtf8AsByteBuf(metadata); + ByteBuf dataByteBuf = getUtf8AsByteBuf(data); + + try { + return createRequestFireAndForgetFrame( + byteBufAllocator, follows, metadataByteBuf, dataByteBuf); + } finally { + release(metadataByteBuf); + release(dataByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/RequestResponseFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/RequestResponseFrame.java index e0b0d9291..ce834ca20 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/RequestResponseFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/RequestResponseFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.util.RecyclerFactory.createRecycler; import io.netty.buffer.ByteBuf; @@ -74,8 +75,15 @@ public static RequestResponseFrame createRequestResponseFrame( @Nullable String metadata, @Nullable String data) { - return createRequestResponseFrame( - byteBufAllocator, follows, getUtf8AsByteBuf(metadata), getUtf8AsByteBuf(data)); + ByteBuf metadataByteBuf = getUtf8AsByteBuf(metadata); + ByteBuf dataByteBuf = getUtf8AsByteBuf(data); + + try { + return createRequestResponseFrame(byteBufAllocator, follows, metadataByteBuf, dataByteBuf); + } finally { + release(metadataByteBuf); + release(dataByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/RequestStreamFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/RequestStreamFrame.java index f372c631a..fa5747a63 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/RequestStreamFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/RequestStreamFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.util.RecyclerFactory.createRecycler; import io.netty.buffer.ByteBuf; @@ -79,12 +80,16 @@ public static RequestStreamFrame createRequestStreamFrame( @Nullable String metadata, @Nullable String data) { - return createRequestStreamFrame( - byteBufAllocator, - follows, - initialRequestN, - getUtf8AsByteBuf(metadata), - getUtf8AsByteBuf(data)); + ByteBuf metadataByteBuf = getUtf8AsByteBuf(metadata); + ByteBuf dataByteBuf = getUtf8AsByteBuf(data); + + try { + return createRequestStreamFrame( + byteBufAllocator, follows, initialRequestN, metadataByteBuf, dataByteBuf); + } finally { + release(metadataByteBuf); + release(dataByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/ResumeFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/ResumeFrame.java index c575d2ba2..1ea2eda31 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/ResumeFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/ResumeFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.framing.FrameType.RESUME; import static io.rsocket.framing.LengthUtils.getLengthAsUnsignedShort; import static io.rsocket.util.NumberUtils.requireUnsignedShort; @@ -70,12 +71,19 @@ public static ResumeFrame createResumeFrame( long lastReceivedServerPosition, long firstAvailableClientPosition) { - return createResumeFrame( - byteBufAllocator, + ByteBuf resumeIdentificationTokenByteBuf = getUtf8AsByteBufRequired( - resumeIdentificationToken, "resumeIdentificationToken must not be null"), - lastReceivedServerPosition, - firstAvailableClientPosition); + resumeIdentificationToken, "resumeIdentificationToken must not be null"); + + try { + return createResumeFrame( + byteBufAllocator, + resumeIdentificationTokenByteBuf, + lastReceivedServerPosition, + firstAvailableClientPosition); + } finally { + release(resumeIdentificationToken); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/SetupFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/SetupFrame.java index b58f2a943..0591578a6 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/SetupFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/SetupFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.framing.LengthUtils.getLengthAsUnsignedByte; import static io.rsocket.framing.LengthUtils.getLengthAsUnsignedShort; import static io.rsocket.util.NumberUtils.requireUnsignedShort; @@ -110,16 +111,26 @@ public static SetupFrame createSetupFrame( @Nullable String metadata, @Nullable String data) { - return createSetupFrame( - byteBufAllocator, - lease, - keepAliveInterval, - maxLifetime, - getUtf8AsByteBuf(resumeIdentificationToken), - metadataMimeType, - dataMimeType, - getUtf8AsByteBuf(metadata), - getUtf8AsByteBuf(data)); + ByteBuf resumeIdentificationTokenByteBuf = getUtf8AsByteBuf(resumeIdentificationToken); + ByteBuf metadataByteBuf = getUtf8AsByteBuf(metadata); + ByteBuf dataByteBuf = getUtf8AsByteBuf(data); + + try { + return createSetupFrame( + byteBufAllocator, + lease, + keepAliveInterval, + maxLifetime, + resumeIdentificationTokenByteBuf, + metadataMimeType, + dataMimeType, + metadataByteBuf, + dataByteBuf); + } finally { + release(resumeIdentificationTokenByteBuf); + release(metadataByteBuf); + release(dataByteBuf); + } } /** @@ -211,38 +222,46 @@ public static SetupFrame createSetupFrame( ByteBuf dataMimeTypeByteBuf = getUtf8AsByteBufRequired(dataMimeType, "dataMimeType must not be null"); - ByteBuf byteBuf = createFrameTypeAndFlags(byteBufAllocator, FrameType.SETUP); - - if (lease) { - byteBuf = setFlag(byteBuf, FLAG_LEASE); - } + try { + ByteBuf byteBuf = createFrameTypeAndFlags(byteBufAllocator, FrameType.SETUP); - byteBuf = - byteBuf - .writeShort(requireUnsignedShort(majorVersion)) - .writeShort(requireUnsignedShort(minorVersion)) - .writeInt(toIntExact(keepAliveInterval.toMillis())) - .writeInt(toIntExact(maxLifetime.toMillis())); + if (lease) { + byteBuf = setFlag(byteBuf, FLAG_LEASE); + } - if (resumeIdentificationToken != null) { byteBuf = - setFlag(byteBuf, FLAG_RESUME_ENABLED) - .writeShort(getLengthAsUnsignedShort(resumeIdentificationToken)); + byteBuf + .writeShort(requireUnsignedShort(majorVersion)) + .writeShort(requireUnsignedShort(minorVersion)) + .writeInt(toIntExact(keepAliveInterval.toMillis())) + .writeInt(toIntExact(maxLifetime.toMillis())); + + if (resumeIdentificationToken != null) { + byteBuf = + setFlag(byteBuf, FLAG_RESUME_ENABLED) + .writeShort(getLengthAsUnsignedShort(resumeIdentificationToken)); + byteBuf = + Unpooled.wrappedBuffer( + byteBuf, resumeIdentificationToken.retain(), byteBufAllocator.buffer()); + } + + byteBuf = byteBuf.writeByte(getLengthAsUnsignedByte(metadataMimeTypeByteBuf)); byteBuf = Unpooled.wrappedBuffer( - byteBuf, resumeIdentificationToken.retain(), byteBufAllocator.buffer()); - } + byteBuf, metadataMimeTypeByteBuf.retain(), byteBufAllocator.buffer()); - byteBuf = byteBuf.writeByte(getLengthAsUnsignedByte(metadataMimeTypeByteBuf)); - byteBuf = Unpooled.wrappedBuffer(byteBuf, metadataMimeTypeByteBuf, byteBufAllocator.buffer()); - - byteBuf = byteBuf.writeByte(getLengthAsUnsignedByte(dataMimeTypeByteBuf)); - byteBuf = Unpooled.wrappedBuffer(byteBuf, dataMimeTypeByteBuf, byteBufAllocator.buffer()); + byteBuf = byteBuf.writeByte(getLengthAsUnsignedByte(dataMimeTypeByteBuf)); + byteBuf = + Unpooled.wrappedBuffer(byteBuf, dataMimeTypeByteBuf.retain(), byteBufAllocator.buffer()); - byteBuf = appendMetadata(byteBufAllocator, byteBuf, metadata); - byteBuf = appendData(byteBuf, data); + byteBuf = appendMetadata(byteBufAllocator, byteBuf, metadata); + byteBuf = appendData(byteBuf, data); - return RECYCLER.get().setByteBuf(byteBuf); + return RECYCLER.get().setByteBuf(byteBuf); + } finally { + release(metadataMimeTypeByteBuf); + release(dataMimeTypeByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/util/AbstractionLeakingFrameUtils.java b/rsocket-core/src/main/java/io/rsocket/util/AbstractionLeakingFrameUtils.java index c7215da36..7e5a5d771 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/AbstractionLeakingFrameUtils.java +++ b/rsocket-core/src/main/java/io/rsocket/util/AbstractionLeakingFrameUtils.java @@ -16,9 +16,10 @@ package io.rsocket.util; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.framing.FrameLengthFrame.createFrameLengthFrame; import static io.rsocket.framing.StreamIdFrame.createStreamIdFrame; -import static io.rsocket.util.DisposableUtil.disposeQuietly; +import static io.rsocket.util.DisposableUtils.disposeQuietly; import io.netty.buffer.ByteBufAllocator; import io.rsocket.Frame; @@ -60,7 +61,7 @@ public static Tuple2 fromAbstractionLeakingFr return Tuples.of(streamIdFrame.getStreamId(), frame); } finally { disposeQuietly(frameLengthFrame, streamIdFrame); - abstractionLeakingFrame.release(); + release(abstractionLeakingFrame); } } diff --git a/rsocket-core/src/main/java/io/rsocket/util/DisposableUtil.java b/rsocket-core/src/main/java/io/rsocket/util/DisposableUtils.java similarity index 88% rename from rsocket-core/src/main/java/io/rsocket/util/DisposableUtil.java rename to rsocket-core/src/main/java/io/rsocket/util/DisposableUtils.java index 92dd1d82c..c87a08220 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/DisposableUtil.java +++ b/rsocket-core/src/main/java/io/rsocket/util/DisposableUtils.java @@ -19,9 +19,9 @@ import reactor.core.Disposable; /** Utilities for working with the {@link Disposable} type. */ -public final class DisposableUtil { +public final class DisposableUtils { - private DisposableUtil() {} + private DisposableUtils() {} /** * Calls the {@link Disposable#dispose()} method if the instance is not null. If any exceptions @@ -34,7 +34,9 @@ public static void disposeQuietly(Disposable... disposables) { .forEach( disposable -> { try { - disposable.dispose(); + if (disposable != null) { + disposable.dispose(); + } } catch (RuntimeException e) { // Suppress any exceptions during disposal }