diff --git a/src/main/java/io/netty/buffer/api/Buf.java b/src/main/java/io/netty/buffer/api/Buf.java index 98404a1..36d4305 100644 --- a/src/main/java/io/netty/buffer/api/Buf.java +++ b/src/main/java/io/netty/buffer/api/Buf.java @@ -367,12 +367,53 @@ default ByteCursor openReverseCursor() { * If this buffer already has the necessary space, then this method returns immediately. * If this buffer does not already have the necessary space, then it will be expanded using the {@link Allocator} * the buffer was created with. + * This method is the same as calling {@link #ensureWritable(int, boolean)} where {@code allowCompaction} is + * {@code false}. * * @param size The requested number of bytes of space that should be available for writing. * @throws IllegalStateException if this buffer is not in an owned state. - * That is, if {@link #countBorrows()} is not {@code 0}. + * That is, if {@link #isOwned()} is {@code false}. */ - void ensureWritable(int size); + default void ensureWritable(int size) { + ensureWritable(size, true); + } + + /** + * Ensure that this buffer has {@linkplain #writableBytes() available space for writing} the given number of + * bytes. + * The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown. + * If this buffer already has the necessary space, then this method returns immediately. + * If this buffer does not already have the necessary space, then space will be made available in one or all of + * the following available ways: + * + * + * + * @param size The requested number of bytes of space that should be available for writing. + * @param allowCompaction {@code true} if the method is allowed to modify the + * {@linkplain #readerOffset() reader offset} and + * {@linkplain #writerOffset() writer offset}, otherwise {@code false}. + * @throws IllegalStateException if this buffer is not in an owned state. + * That is, if {@link #isOwned()} is {@code false}. + */ + void ensureWritable(int size, boolean allowCompaction); /** * Split the buffer into two, at the {@linkplain #writerOffset() write offset} position. @@ -417,4 +458,11 @@ default ByteCursor openReverseCursor() { * @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer. */ Buf bifurcate(); + + /** + * Discards the read bytes, and moves the buffer contents to the beginning of the buffer. + * + * The buffer must be {@linkplain #isOwned() owned}, or an exception will be thrown. + */ + void compact(); } diff --git a/src/main/java/io/netty/buffer/api/CompositeBuf.java b/src/main/java/io/netty/buffer/api/CompositeBuf.java index d3c4f07..7dd818d 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuf.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuf.java @@ -20,6 +20,9 @@ import java.util.Arrays; import java.util.Objects; +import static jdk.incubator.foreign.MemoryAccess.setByteAtOffset; +import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset; + final class CompositeBuf extends RcSupport implements Buf { /** * The max array size is JVM implementation dependant, but most seem to settle on {@code Integer.MAX_VALUE - 8}. @@ -520,20 +523,54 @@ public int bytesLeft() { } @Override - public void ensureWritable(int size) { + public void ensureWritable(int size, boolean allowCompaction) { if (!isOwned()) { throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable."); } if (size < 0) { throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); } - if (writableBytes() < size) { - long newSize = capacity() + (long) size; - Allocator.checkSize(newSize); - int growth = size - writableBytes(); - Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order()); - unsafeExtendWith(extension); + if (writableBytes() >= size) { + // We already have enough space. + return; } + + if (allowCompaction && size <= roff) { + // Let's see if we can solve some or all of the requested size with compaction. + // We always compact as much as is possible, regardless of size. This amortizes our work. + int compactableBuffers = 0; + for (Buf buf : bufs) { + if (buf.capacity() != buf.readerOffset()) { + break; + } + compactableBuffers++; + } + if (compactableBuffers > 0) { + Buf[] compactable; + if (compactableBuffers < bufs.length) { + compactable = new Buf[compactableBuffers]; + System.arraycopy(bufs, 0, compactable, 0, compactable.length); + System.arraycopy(bufs, compactable.length, bufs, 0, bufs.length - compactable.length); + System.arraycopy(compactable, 0, bufs, bufs.length - compactable.length, compactable.length); + } else { + compactable = bufs; + } + for (Buf buf : compactable) { + buf.reset(); + } + computeBufferOffsets(); + if (writableBytes() >= size) { + // Now we have enough space. + return; + } + } + } + + long newSize = capacity() + (long) size; + Allocator.checkSize(newSize); + int growth = size - writableBytes(); + Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order()); + unsafeExtendWith(extension); } void extendWith(Buf extension) { @@ -600,6 +637,35 @@ public Buf bifurcate() { } } + @Override + public void compact() { + if (!isOwned()) { + throw new IllegalStateException("Buffer must be owned in order to compact."); + } + int distance = roff; + if (distance == 0) { + return; + } + int pos = 0; + var oldOrder = order; + order = ByteOrder.BIG_ENDIAN; + try { + var cursor = openCursor(); + while (cursor.readLong()) { + setLong(pos, cursor.getLong()); + pos += Long.BYTES; + } + while (cursor.readByte()) { + setByte(pos, cursor.getByte()); + pos++; + } + } finally { + order = oldOrder; + } + readerOffset(0); + writerOffset(woff - distance); + } + // @Override public byte readByte() { diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java index 6631824..6484c04 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java @@ -323,38 +323,50 @@ public int bytesLeft() { } @Override - public void ensureWritable(int size) { + public void ensureWritable(int size, boolean allowCompaction) { if (!isOwned()) { throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable."); } if (size < 0) { throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); } - if (writableBytes() < size) { - long newSize = capacity() + size - (long) writableBytes(); - Allocator.checkSize(newSize); - RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize); - var newSegment = recoverableMemory.segment; - newSegment.copyFrom(seg); - - // Release old memory segment: - var drop = unsafeGetDrop(); - if (drop instanceof BifurcatedDrop) { - // Disconnect from the bifurcated drop, since we'll get our own fresh memory segment. - int roff = this.roff; - int woff = this.woff; - drop.drop(this); - drop = ((BifurcatedDrop) drop).unwrap(); - unsafeSetDrop(drop); - this.roff = roff; - this.woff = woff; - } else { - alloc.recoverMemory(recoverableMemory()); - } + if (writableBytes() >= size) { + // We already have enough space. + return; + } - seg = newSegment; - drop.attach(this); + if (allowCompaction && writableBytes() + readerOffset() >= size) { + // We can solve this with compaction. + compact(); + return; + } + + // Allocate a bigger buffer. + long newSize = capacity() + size - (long) writableBytes(); + Allocator.checkSize(newSize); + RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize); + var newSegment = recoverableMemory.segment; + + // Copy contents. + newSegment.copyFrom(seg); + + // Release old memory segment: + var drop = unsafeGetDrop(); + if (drop instanceof BifurcatedDrop) { + // Disconnect from the bifurcated drop, since we'll get our own fresh memory segment. + int roff = this.roff; + int woff = this.woff; + drop.drop(this); + drop = ((BifurcatedDrop) drop).unwrap(); + unsafeSetDrop(drop); + this.roff = roff; + this.woff = woff; + } else { + alloc.recoverMemory(recoverableMemory()); } + + seg = newSegment; + drop.attach(this); } @Override @@ -384,6 +396,20 @@ public Buf bifurcate() { return bifurcatedBuf; } + @Override + public void compact() { + if (!isOwned()) { + throw new IllegalStateException("Buffer must be owned in order to compact."); + } + int distance = roff; + if (distance == 0) { + return; + } + seg.copyFrom(seg.asSlice(roff, woff - roff)); + roff -= distance; + woff -= distance; + } + // @Override public byte readByte() { diff --git a/src/test/java/io/netty/buffer/api/BufTest.java b/src/test/java/io/netty/buffer/api/BufTest.java index bb5da96..da449cf 100644 --- a/src/test/java/io/netty/buffer/api/BufTest.java +++ b/src/test/java/io/netty/buffer/api/BufTest.java @@ -383,7 +383,7 @@ public void originalBufferMustNotBeAccessibleAfterSend(Fixture fixture) { } } - private void verifyInaccessible(Buf buf) { + private static void verifyInaccessible(Buf buf) { assertThrows(IllegalStateException.class, () -> buf.readByte()); assertThrows(IllegalStateException.class, () -> buf.readUnsignedByte()); assertThrows(IllegalStateException.class, () -> buf.readChar()); @@ -1689,6 +1689,29 @@ public void ensureWritableOnCompositeBuffersMustRespectExistingLittleEndianByteO } } + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void ensureWritableWithCompactionMustNotAllocateIfCompactionIsEnough(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(64)) { + while (buf.writableBytes() > 0) { + buf.writeByte((byte) 42); + } + while (buf.readableBytes() > 0) { + buf.readByte(); + } + buf.ensureWritable(4, true); + buf.writeInt(42); + assertThat(buf.capacity()).isEqualTo(64); + + buf.writerOffset(60).readerOffset(60); + buf.ensureWritable(8, true); + buf.writeLong(42); + // Don't assert the capacity on this one, because single-component + // composite buffers may choose to allocate rather than compact. + } + } + @ParameterizedTest @MethodSource("allocators") public void pooledBuffersMustResetStateBeforeReuse(Fixture fixture) { @@ -2139,7 +2162,7 @@ public void bifurcateOnEmptyLittleEndianCompositeBuffer() { } } - private void verifyBifurcateEmptyCompositeBuffer(Buf buf) { + private static void verifyBifurcateEmptyCompositeBuffer(Buf buf) { try (Buf a = buf.bifurcate()) { a.ensureWritable(4); buf.ensureWritable(4); @@ -2195,6 +2218,43 @@ public void sendMustNotMakeBifurcatedBuffersInaccessible(Fixture fixture) throws } } + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void compactMustDiscardReadBytes(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(16, ByteOrder.BIG_ENDIAN)) { + buf.writeLong(0x0102030405060708L).writeInt(0x090A0B0C); + assertEquals(0x01020304, buf.readInt()); + assertEquals(12, buf.writerOffset()); + assertEquals(4, buf.readerOffset()); + assertEquals(4, buf.writableBytes()); + assertEquals(8, buf.readableBytes()); + assertEquals(16, buf.capacity()); + buf.compact(); + assertEquals(8, buf.writerOffset()); + assertEquals(0, buf.readerOffset()); + assertEquals(8, buf.writableBytes()); + assertEquals(8, buf.readableBytes()); + assertEquals(16, buf.capacity()); + assertEquals(0x05060708090A0B0CL, buf.readLong()); + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void compactMustThrowForUnownedBuffer(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) { + buf.writeLong(0x0102030405060708L); + assertEquals((byte) 0x01, buf.readByte()); + try (Buf ignore = buf.acquire()) { + assertThrows(IllegalStateException.class, () -> buf.compact()); + assertEquals(1, buf.readerOffset()); + } + assertEquals((byte) 0x02, buf.readByte()); + } + } + // @ParameterizedTest @MethodSource("allocators")