Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/main/java/io/netty/buffer/api/Allocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ default Buf allocate(int size, ByteOrder order) {
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent {@linkplain Buf#order() byte order}.
*/
default Buf compose(Buf... bufs) {
default Buf compose(Deref<Buf>... bufs) {
return new CompositeBuf(this, bufs);
}

Expand All @@ -117,8 +117,8 @@ default Buf compose(Buf... bufs) {
* the composite buffer was created.
* The composite buffer is modified in-place.
*
* @see #compose(Buf...)
* @param composite The composite buffer (from a prior {@link #compose(Buf...)} call) to extend with the given
* @see #compose(Deref...)
* @param composite The composite buffer (from a prior {@link #compose(Deref...)} call) to extend with the given
* extension buffer.
* @param extension The buffer to extend the composite buffer with.
*/
Expand All @@ -133,9 +133,9 @@ static void extend(Buf composite, Buf extension) {
}

/**
* Check if the given buffer is a {@linkplain #compose(Buf...) composite} buffer or not.
* Check if the given buffer is a {@linkplain #compose(Deref...) composite} buffer or not.
* @param composite The buffer to check.
* @return {@code true} if the given buffer was created with {@link #compose(Buf...)}, {@code false} otherwise.
* @return {@code true} if the given buffer was created with {@link #compose(Deref...)}, {@code false} otherwise.
*/
static boolean isComposite(Buf composite) {
return composite.getClass() == CompositeBuf.class;
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/netty/buffer/api/Buf.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
* To send a buffer to another thread, the buffer must not have any outstanding borrows.
* That is to say, all {@linkplain #acquire() acquires} must have been paired with a {@link #close()};
* all {@linkplain #slice() slices} must have been closed.
* And if this buffer is a constituent of a {@linkplain Allocator#compose(Buf...) composite buffer},
* And if this buffer is a constituent of a {@linkplain Allocator#compose(Deref...) composite buffer},
* then that composite buffer must be closed.
* And if this buffer is itself a composite buffer, then it must own all of its constituent buffers.
* The {@link #isOwned()} method can be used on any buffer to check if it can be sent or not.
Expand Down Expand Up @@ -439,6 +439,8 @@ default void ensureWritable(int size) {
/**
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.
* <p>
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* <p>
* The region of this buffer that contain the read and readable bytes, will be captured and returned in a new
* buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently
* {@linkplain #send() sent} to other threads.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/netty/buffer/api/BufHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ public int countBorrows() {
return buf.countBorrows();
}

@SuppressWarnings("unchecked")
@Override
public Send<T> send() {
var send = buf.send();
return () -> receive(send.receive());
return buf.send().map((Class<T>) getClass(), this::receive);
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/netty/buffer/api/ComponentProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ interface ReadableComponent {
* @return A new {@link ByteBuffer}, with its own position and limit, for this memory component.
*/
ByteBuffer readableBuffer();
// todo for Unsafe-based impl, DBB.attachment needs to keep underlying memory alive
}

/**
Expand Down
94 changes: 64 additions & 30 deletions src/main/java/io/netty/buffer/api/CompositeBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,68 +54,102 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private boolean closed;
private boolean readOnly;

CompositeBuf(Allocator allocator, Buf[] bufs) {
this(allocator, true, filterExternalBufs(bufs), COMPOSITE_DROP);
CompositeBuf(Allocator allocator, Deref<Buf>[] refs) {
this(allocator, true, filterExternalBufs(refs), COMPOSITE_DROP, false);
}

private static Buf[] filterExternalBufs(Buf[] bufs) {
private static Buf[] filterExternalBufs(Deref<Buf>[] refs) {
// We filter out all zero-capacity buffers because they wouldn't contribute to the composite buffer anyway,
// and also, by ensuring that all constituent buffers contribute to the size of the composite buffer,
// we make sure that the number of composite buffers will never become greater than the number of bytes in
// the composite buffer.
// This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable,
// will never overflow their component counts.
// Allocating a new array unconditionally also prevents external modification of the array.
bufs = Arrays.stream(bufs)
.filter(b -> b.capacity() > 0)
Buf[] bufs = Arrays.stream(refs)
.map(r -> r.get()) // Increments reference counts.
.filter(CompositeBuf::discardEmpty)
.flatMap(CompositeBuf::flattenBuffer)
.toArray(Buf[]::new);
// Make sure there are no duplicates among the buffers.
Set<Buf> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
duplicatesCheck.addAll(Arrays.asList(bufs));
if (duplicatesCheck.size() < bufs.length) {
for (Buf buf : bufs) {
buf.close(); // Undo the increment we did with Deref.get().
}
throw new IllegalArgumentException(
"Cannot create composite buffer with duplicate constituent buffer components.");
}
return bufs;
}

private static boolean discardEmpty(Buf buf) {
if (buf.capacity() > 0) {
return true;
} else {
// If we filter a buffer out, then we must make sure to close it since we incremented the reference count
// with Deref.get() earlier.
buf.close();
return false;
}
}

private static Stream<Buf> flattenBuffer(Buf buf) {
if (buf instanceof CompositeBuf) {
return Stream.of(((CompositeBuf) buf).bufs);
// Extract components and move our reference count from the composite onto the components.
var composite = (CompositeBuf) buf;
var bufs = composite.bufs;
for (Buf b : bufs) {
b.acquire();
}
buf.close(); // Important: acquire on components *before* closing composite.
return Stream.of(bufs);
}
return Stream.of(buf);
}

private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop,
boolean acquireBufs) {
super(drop);
this.allocator = allocator;
this.isSendable = isSendable;
for (Buf buf : bufs) {
buf.acquire();
}
if (bufs.length > 0) {
ByteOrder targetOrder = bufs[0].order();
if (acquireBufs) {
for (Buf buf : bufs) {
if (buf.order() != targetOrder) {
throw new IllegalArgumentException("Constituent buffers have inconsistent byte order.");
}
buf.acquire();
}
order = bufs[0].order();
}
try {
if (bufs.length > 0) {
ByteOrder targetOrder = bufs[0].order();
for (Buf buf : bufs) {
if (buf.order() != targetOrder) {
throw new IllegalArgumentException("Constituent buffers have inconsistent byte order.");
}
}
order = bufs[0].order();

boolean targetReadOnly = bufs[0].readOnly();
for (Buf buf : bufs) {
if (buf.readOnly() != targetReadOnly) {
throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state.");
boolean targetReadOnly = bufs[0].readOnly();
for (Buf buf : bufs) {
if (buf.readOnly() != targetReadOnly) {
throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state.");
}
}
readOnly = targetReadOnly;
} else {
order = ByteOrder.nativeOrder();
}
readOnly = targetReadOnly;
} else {
order = ByteOrder.nativeOrder();
this.bufs = bufs;
computeBufferOffsets();
tornBufAccessors = new TornBufAccessors(this);
} catch (Exception e) {
// Always close bufs on exception, regardless of acquireBufs value.
// If acquireBufs is false, it just means the ref count increments happened prior to this constructor call.
for (Buf buf : bufs) {
buf.close();
}
throw e;
}
this.bufs = bufs;
computeBufferOffsets();
tornBufAccessors = new TornBufAccessors(this);
}

private void computeBufferOffsets() {
Expand Down Expand Up @@ -292,7 +326,7 @@ public Buf slice(int offset, int length) {
slices = new Buf[] { choice.slice(subOffset, 0) };
}

return new CompositeBuf(allocator, false, slices, drop);
return new CompositeBuf(allocator, false, slices, drop, true);
} catch (Throwable throwable) {
// We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer:
close();
Expand Down Expand Up @@ -718,7 +752,7 @@ public Buf bifurcate() {
}
if (bufs.length == 0) {
// Bifurcating a zero-length buffer is trivial.
return new CompositeBuf(allocator, true, bufs, unsafeGetDrop()).order(order);
return new CompositeBuf(allocator, true, bufs, unsafeGetDrop(), true).order(order);
}

int i = searchOffsets(woff);
Expand All @@ -730,7 +764,7 @@ public Buf bifurcate() {
}
computeBufferOffsets();
try {
var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop());
var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop(), true);
compositeBuf.order = order; // Preserve byte order even if bifs array is empty.
return compositeBuf;
} finally {
Expand Down Expand Up @@ -1133,7 +1167,7 @@ public CompositeBuf transferOwnership(Drop<CompositeBuf> drop) {
for (int i = 0; i < sends.length; i++) {
received[i] = sends[i].receive();
}
var composite = new CompositeBuf(allocator, true, received, drop);
var composite = new CompositeBuf(allocator, true, received, drop, true);
composite.readOnly = readOnly;
drop.attach(composite);
return composite;
Expand Down
52 changes: 52 additions & 0 deletions src/main/java/io/netty/buffer/api/Deref.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;

import java.util.function.Supplier;

/**
* A Deref provides the capability to acquire a reference to a {@linkplain Rc reference counted} object.
* <p>
* <strong>Note:</strong> Callers must ensure that they close any references they obtain.
* <p>
* Deref itself does not specify if a reference can be obtained more than once.
* For instance, any {@link Send} object is also a {@code Deref}, but the reference can only be acquired once.
* Meanwhile, {@link Rc} objects are themselves their own {@code Derefs}, and permit references to be acquired multiple
* times.
*
* @param <T> The concrete type of reference counted object that can be obtained.
*/
public interface Deref<T extends Rc<T>> extends Supplier<T> {
/**
* Acquire a reference to the reference counted object.
* <p>
* <strong>Note:</strong> This call increments the reference count of the acquired object, and must be paired with
* a {@link Rc#close()} call.
* Using a try-with-resources clause is the easiest way to ensure this.
*
* @return A reference to the reference counted object.
*/
@Override
T get();

/**
* Determine if the object in this {@code Deref} is an instance of the given class.
*
* @param cls The type to check.
* @return {@code true} if the object in this {@code Deref} can be assigned fields or variables of the given type.
*/
boolean isInstanceOf(Class<?> cls);
}
12 changes: 11 additions & 1 deletion src/main/java/io/netty/buffer/api/Rc.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*
* @param <I> The concrete subtype.
*/
public interface Rc<I extends Rc<I>> extends AutoCloseable {
public interface Rc<I extends Rc<I>> extends AutoCloseable, Deref<I> {
/**
* Increment the reference count.
* <p>
Expand All @@ -36,6 +36,16 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable {
*/
I acquire();

@Override
default I get() {
return acquire();
}

@Override
default boolean isInstanceOf(Class<?> cls) {
return cls.isInstance(this);
}

/**
* Decrement the reference count, and despose of the resource if the last reference is closed.
* <p>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/netty/buffer/api/RcSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public final Send<I> send() {
}
var owned = prepareSend();
acquires = -2; // Close without dropping. This also ignore future double-free attempts.
return new TransferSend<I, T>(owned, drop);
return new TransferSend<I, T>(owned, drop, getClass());
}

/**
Expand Down
Loading