From 85cee9d1a8881a93e7c43a02baefea75b6ade2b5 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 2 Mar 2024 07:14:58 -0800 Subject: [PATCH] =?UTF-8?q?Transport:=20AdaptiveRecvByteBufAllocator=20doe?= =?UTF-8?q?s=20not=20correctly=20calculate=20=E2=80=A6=20(#13882)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …initial buffer capacity in some cases Motivation: We did not correctly calculate the initial index sometimes which could lead to the initial buffer capacity to exceed the configured maximum size. Modifications: - Correctly calculate initialIndex - Add unit tests Result: Fixes https://github.com/netty/netty/issues/13722 --- .../channel/AdaptiveRecvByteBufAllocator.java | 29 ++++++++--- .../AdaptiveRecvByteBufAllocatorTest.java | 50 +++++++++++++++++++ 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AdaptiveRecvByteBufAllocator.java b/transport/src/main/java/io/netty/channel/AdaptiveRecvByteBufAllocator.java index 46ef634f528..c8feb40a616 100644 --- a/transport/src/main/java/io/netty/channel/AdaptiveRecvByteBufAllocator.java +++ b/transport/src/main/java/io/netty/channel/AdaptiveRecvByteBufAllocator.java @@ -94,16 +94,20 @@ private static int getSizeTableIndex(final int size) { private final class HandleImpl extends MaxMessageHandle { private final int minIndex; private final int maxIndex; + private final int minCapacity; + private final int maxCapacity; private int index; private int nextReceiveBufferSize; private boolean decreaseNow; - HandleImpl(int minIndex, int maxIndex, int initial) { + HandleImpl(int minIndex, int maxIndex, int initialIndex, int minCapacity, int maxCapacity) { this.minIndex = minIndex; this.maxIndex = maxIndex; - index = getSizeTableIndex(initial); - nextReceiveBufferSize = SIZE_TABLE[index]; + index = initialIndex; + nextReceiveBufferSize = max(SIZE_TABLE[index], minCapacity); + this.minCapacity = minCapacity; + this.maxCapacity = maxCapacity; } @Override @@ -127,14 +131,14 @@ private void record(int actualReadBytes) { if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) { if (decreaseNow) { index = max(index - INDEX_DECREMENT, minIndex); - nextReceiveBufferSize = SIZE_TABLE[index]; + nextReceiveBufferSize = max(SIZE_TABLE[index], minCapacity); decreaseNow = false; } else { decreaseNow = true; } } else if (actualReadBytes >= nextReceiveBufferSize) { index = min(index + INDEX_INCREMENT, maxIndex); - nextReceiveBufferSize = SIZE_TABLE[index]; + nextReceiveBufferSize = min(SIZE_TABLE[index], maxCapacity); decreaseNow = false; } } @@ -147,7 +151,9 @@ public void readComplete() { private final int minIndex; private final int maxIndex; - private final int initial; + private final int initialIndex; + private final int minCapacity; + private final int maxCapacity; /** * Creates a new predictor with the default parameters. With the default @@ -188,13 +194,20 @@ public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { this.maxIndex = maxIndex; } - this.initial = initial; + int initialIndex = getSizeTableIndex(initial); + if (SIZE_TABLE[initialIndex] > initial) { + this.initialIndex = initialIndex - 1; + } else { + this.initialIndex = initialIndex; + } + this.minCapacity = minimum; + this.maxCapacity = maximum; } @SuppressWarnings("deprecation") @Override public Handle newHandle() { - return new HandleImpl(minIndex, maxIndex, initial); + return new HandleImpl(minIndex, maxIndex, initialIndex, minCapacity, maxCapacity); } @Override diff --git a/transport/src/test/java/io/netty/channel/AdaptiveRecvByteBufAllocatorTest.java b/transport/src/test/java/io/netty/channel/AdaptiveRecvByteBufAllocatorTest.java index f2ec812f16d..309e0fc4e22 100644 --- a/transport/src/test/java/io/netty/channel/AdaptiveRecvByteBufAllocatorTest.java +++ b/transport/src/test/java/io/netty/channel/AdaptiveRecvByteBufAllocatorTest.java @@ -20,9 +20,11 @@ import io.netty.buffer.UnpooledByteBufAllocator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; import org.mockito.Mock; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -98,6 +100,54 @@ public void lastPartialReadCanRampUp() { allocReadExpected(handle, alloc, 131072); } + @Test + public void doesNotExceedMaximum() { + AdaptiveRecvByteBufAllocator recvByteBufAllocator = new AdaptiveRecvByteBufAllocator(64, 9000, 9000); + RecvByteBufAllocator.ExtendedHandle handle = + (RecvByteBufAllocator.ExtendedHandle) recvByteBufAllocator.newHandle(); + handle.reset(config); + allocReadExpected(handle, alloc, 8192); + } + + @Test + public void doesSetCorrectMinBounds() { + AdaptiveRecvByteBufAllocator recvByteBufAllocator = new AdaptiveRecvByteBufAllocator(81, 95, 95); + RecvByteBufAllocator.ExtendedHandle handle = + (RecvByteBufAllocator.ExtendedHandle) recvByteBufAllocator.newHandle(); + handle.reset(config); + allocReadExpected(handle, alloc, 81); + } + + @Test + public void throwsIfInitialIsBiggerThenMaximum() { + assertThrows(IllegalArgumentException.class, new Executable() { + @Override + public void execute() { + new AdaptiveRecvByteBufAllocator(64, 4096 , 1024); + } + }); + } + + @Test + public void throwsIfInitialIsSmallerThenMinimum() { + assertThrows(IllegalArgumentException.class, new Executable() { + @Override + public void execute() { + new AdaptiveRecvByteBufAllocator(512, 64 , 1024); + } + }); + } + + @Test + public void throwsIfMinimumIsBiggerThenMaximum() { + assertThrows(IllegalArgumentException.class, new Executable() { + @Override + public void execute() { + new AdaptiveRecvByteBufAllocator(2048, 64 , 1024); + } + }); + } + private static void allocReadExpected(RecvByteBufAllocator.ExtendedHandle handle, ByteBufAllocator alloc, int expectedSize) {