Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions src/main/java/io/netty/buffer/api/Buf.java
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,53 @@ default ByteCursor openReverseCursor() {
* If this buffer already has the necessary space, then this method returns immediately.
* If this buffer does not already have the necessary space, then it will be expanded using the {@link Allocator}
* the buffer was created with.
* This method is the same as calling {@link #ensureWritable(int, boolean)} where {@code allowCompaction} is
* {@code false}.
*
* @param size The requested number of bytes of space that should be available for writing.
* @throws IllegalStateException if this buffer is not in an owned state.
* That is, if {@link #countBorrows()} is not {@code 0}.
* That is, if {@link #isOwned()} is {@code false}.
*/
void ensureWritable(int size);
default void ensureWritable(int size) {
ensureWritable(size, true);
}

/**
* Ensure that this buffer has {@linkplain #writableBytes() available space for writing} the given number of
* bytes.
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* If this buffer already has the necessary space, then this method returns immediately.
* If this buffer does not already have the necessary space, then space will be made available in one or all of
* the following available ways:
*
* <ul>
* <li>
* If {@code allowCompaction} is {@code true}, and sum of the read and writable bytes would be enough to
* satisfy the request, and it (depending on the buffer implementation) seems faster and easier to compact
* the existing buffer rather than allocation a new buffer, then the requested bytes will be made available
* that way. The compaction will not necessarily work the same way as the {@link #compact()} method, as the
* implementation may be able to make the requested bytes available with less effort than is strictly
* mandated by the {@link #compact()} method.
* </li>
* <li>
* Regardless of the value of the {@code allowCompaction}, the implementation may make more space available
* by just allocating more or larger buffers. This allocation would use the same {@link Allocator} that this
* buffer was created with.
* </li>
* <li>
* If {@code allowCompaction} is {@code true}, then the implementation may choose to do a combination of
* compaction and allocation.
* </li>
* </ul>
*
* @param size The requested number of bytes of space that should be available for writing.
* @param allowCompaction {@code true} if the method is allowed to modify the
* {@linkplain #readerOffset() reader offset} and
* {@linkplain #writerOffset() writer offset}, otherwise {@code false}.
* @throws IllegalStateException if this buffer is not in an owned state.
* That is, if {@link #isOwned()} is {@code false}.
*/
void ensureWritable(int size, boolean allowCompaction);

/**
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.
Expand Down Expand Up @@ -417,4 +458,11 @@ default ByteCursor openReverseCursor() {
* @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer.
*/
Buf bifurcate();

/**
* Discards the read bytes, and moves the buffer contents to the beginning of the buffer.
*
* The buffer must be {@linkplain #isOwned() owned}, or an exception will be thrown.
*/
void compact();
}
80 changes: 73 additions & 7 deletions src/main/java/io/netty/buffer/api/CompositeBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.Arrays;
import java.util.Objects;

import static jdk.incubator.foreign.MemoryAccess.setByteAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;

final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
/**
* The max array size is JVM implementation dependant, but most seem to settle on {@code Integer.MAX_VALUE - 8}.
Expand Down Expand Up @@ -520,20 +523,54 @@ public int bytesLeft() {
}

@Override
public void ensureWritable(int size) {
public void ensureWritable(int size, boolean allowCompaction) {
if (!isOwned()) {
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
}
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (writableBytes() < size) {
long newSize = capacity() + (long) size;
Allocator.checkSize(newSize);
int growth = size - writableBytes();
Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
unsafeExtendWith(extension);
if (writableBytes() >= size) {
// We already have enough space.
return;
}

if (allowCompaction && size <= roff) {
// Let's see if we can solve some or all of the requested size with compaction.
// We always compact as much as is possible, regardless of size. This amortizes our work.
int compactableBuffers = 0;
for (Buf buf : bufs) {
if (buf.capacity() != buf.readerOffset()) {
break;
}
compactableBuffers++;
}
if (compactableBuffers > 0) {
Buf[] compactable;
if (compactableBuffers < bufs.length) {
compactable = new Buf[compactableBuffers];
System.arraycopy(bufs, 0, compactable, 0, compactable.length);
System.arraycopy(bufs, compactable.length, bufs, 0, bufs.length - compactable.length);
System.arraycopy(compactable, 0, bufs, bufs.length - compactable.length, compactable.length);
} else {
compactable = bufs;
}
for (Buf buf : compactable) {
buf.reset();
}
computeBufferOffsets();
if (writableBytes() >= size) {
// Now we have enough space.
return;
}
}
}

long newSize = capacity() + (long) size;
Allocator.checkSize(newSize);
int growth = size - writableBytes();
Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
unsafeExtendWith(extension);
}

void extendWith(Buf extension) {
Expand Down Expand Up @@ -600,6 +637,35 @@ public Buf bifurcate() {
}
}

@Override
public void compact() {
if (!isOwned()) {
throw new IllegalStateException("Buffer must be owned in order to compact.");
}
int distance = roff;
if (distance == 0) {
return;
}
int pos = 0;
var oldOrder = order;
order = ByteOrder.BIG_ENDIAN;
try {
var cursor = openCursor();
while (cursor.readLong()) {
setLong(pos, cursor.getLong());
pos += Long.BYTES;
}
while (cursor.readByte()) {
setByte(pos, cursor.getByte());
pos++;
}
} finally {
order = oldOrder;
}
readerOffset(0);
writerOffset(woff - distance);
}

// <editor-fold defaultstate="collapsed" desc="Primitive accessors.">
@Override
public byte readByte() {
Expand Down
74 changes: 50 additions & 24 deletions src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,38 +323,50 @@ public int bytesLeft() {
}

@Override
public void ensureWritable(int size) {
public void ensureWritable(int size, boolean allowCompaction) {
if (!isOwned()) {
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
}
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (writableBytes() < size) {
long newSize = capacity() + size - (long) writableBytes();
Allocator.checkSize(newSize);
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
var newSegment = recoverableMemory.segment;
newSegment.copyFrom(seg);

// Release old memory segment:
var drop = unsafeGetDrop();
if (drop instanceof BifurcatedDrop) {
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
drop = ((BifurcatedDrop) drop).unwrap();
unsafeSetDrop(drop);
this.roff = roff;
this.woff = woff;
} else {
alloc.recoverMemory(recoverableMemory());
}
if (writableBytes() >= size) {
// We already have enough space.
return;
}

seg = newSegment;
drop.attach(this);
if (allowCompaction && writableBytes() + readerOffset() >= size) {
// We can solve this with compaction.
compact();
return;
}

// Allocate a bigger buffer.
long newSize = capacity() + size - (long) writableBytes();
Allocator.checkSize(newSize);
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
var newSegment = recoverableMemory.segment;

// Copy contents.
newSegment.copyFrom(seg);

// Release old memory segment:
var drop = unsafeGetDrop();
if (drop instanceof BifurcatedDrop) {
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
drop = ((BifurcatedDrop) drop).unwrap();
unsafeSetDrop(drop);
this.roff = roff;
this.woff = woff;
} else {
alloc.recoverMemory(recoverableMemory());
}

seg = newSegment;
drop.attach(this);
}

@Override
Expand Down Expand Up @@ -384,6 +396,20 @@ public Buf bifurcate() {
return bifurcatedBuf;
}

@Override
public void compact() {
if (!isOwned()) {
throw new IllegalStateException("Buffer must be owned in order to compact.");
}
int distance = roff;
if (distance == 0) {
return;
}
seg.copyFrom(seg.asSlice(roff, woff - roff));
roff -= distance;
woff -= distance;
}

// <editor-fold defaultstate="collapsed" desc="Primitive accessors implementation.">
@Override
public byte readByte() {
Expand Down
64 changes: 62 additions & 2 deletions src/test/java/io/netty/buffer/api/BufTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public void originalBufferMustNotBeAccessibleAfterSend(Fixture fixture) {
}
}

private void verifyInaccessible(Buf buf) {
private static void verifyInaccessible(Buf buf) {
assertThrows(IllegalStateException.class, () -> buf.readByte());
assertThrows(IllegalStateException.class, () -> buf.readUnsignedByte());
assertThrows(IllegalStateException.class, () -> buf.readChar());
Expand Down Expand Up @@ -1689,6 +1689,29 @@ public void ensureWritableOnCompositeBuffersMustRespectExistingLittleEndianByteO
}
}

@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableWithCompactionMustNotAllocateIfCompactionIsEnough(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(64)) {
while (buf.writableBytes() > 0) {
buf.writeByte((byte) 42);
}
while (buf.readableBytes() > 0) {
buf.readByte();
}
buf.ensureWritable(4, true);
buf.writeInt(42);
assertThat(buf.capacity()).isEqualTo(64);

buf.writerOffset(60).readerOffset(60);
buf.ensureWritable(8, true);
buf.writeLong(42);
// Don't assert the capacity on this one, because single-component
// composite buffers may choose to allocate rather than compact.
}
}

@ParameterizedTest
@MethodSource("allocators")
public void pooledBuffersMustResetStateBeforeReuse(Fixture fixture) {
Expand Down Expand Up @@ -2139,7 +2162,7 @@ public void bifurcateOnEmptyLittleEndianCompositeBuffer() {
}
}

private void verifyBifurcateEmptyCompositeBuffer(Buf buf) {
private static void verifyBifurcateEmptyCompositeBuffer(Buf buf) {
try (Buf a = buf.bifurcate()) {
a.ensureWritable(4);
buf.ensureWritable(4);
Expand Down Expand Up @@ -2195,6 +2218,43 @@ public void sendMustNotMakeBifurcatedBuffersInaccessible(Fixture fixture) throws
}
}

@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void compactMustDiscardReadBytes(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(16, ByteOrder.BIG_ENDIAN)) {
buf.writeLong(0x0102030405060708L).writeInt(0x090A0B0C);
assertEquals(0x01020304, buf.readInt());
assertEquals(12, buf.writerOffset());
assertEquals(4, buf.readerOffset());
assertEquals(4, buf.writableBytes());
assertEquals(8, buf.readableBytes());
assertEquals(16, buf.capacity());
buf.compact();
assertEquals(8, buf.writerOffset());
assertEquals(0, buf.readerOffset());
assertEquals(8, buf.writableBytes());
assertEquals(8, buf.readableBytes());
assertEquals(16, buf.capacity());
assertEquals(0x05060708090A0B0CL, buf.readLong());
}
}

@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void compactMustThrowForUnownedBuffer(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) {
buf.writeLong(0x0102030405060708L);
assertEquals((byte) 0x01, buf.readByte());
try (Buf ignore = buf.acquire()) {
assertThrows(IllegalStateException.class, () -> buf.compact());
assertEquals(1, buf.readerOffset());
}
assertEquals((byte) 0x02, buf.readByte());
}
}

// <editor-fold defaultstate="collapsed" desc="Primitive accessors tests.">
@ParameterizedTest
@MethodSource("allocators")
Expand Down