Skip to content

Commit

Permalink
Faster Recycler's claim/release (Fixes #13153) (#13220)
Browse files Browse the repository at this point in the history
Motivation:

Recycler's claim/release can be made faster by saving expensive volatile ops, when not needed. For claim, always, while for release, if the owner thread is performing release itself.

Modification:

Replacing expensive volatile ops with ordered ones.

Result:

Faster Recycler's claim/release
  • Loading branch information
franz1981 committed Mar 7, 2023
1 parent 84cf7d6 commit c353f4f
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 53 deletions.
Expand Up @@ -16,6 +16,7 @@

package io.netty.buffer;

import io.netty.util.Recycler.EnhancedHandle;
import io.netty.util.internal.ObjectPool.Handle;

import java.nio.ByteBuffer;
Expand All @@ -26,7 +27,7 @@
*/
abstract class AbstractPooledDerivedByteBuf extends AbstractReferenceCountedByteBuf {

private final Handle<AbstractPooledDerivedByteBuf> recyclerHandle;
private final EnhancedHandle<AbstractPooledDerivedByteBuf> recyclerHandle;
private AbstractByteBuf rootParent;
/**
* Deallocations of a pooled derived buffer should always propagate through the entire chain of derived buffers.
Expand All @@ -39,7 +40,7 @@ abstract class AbstractPooledDerivedByteBuf extends AbstractReferenceCountedByte
@SuppressWarnings("unchecked")
AbstractPooledDerivedByteBuf(Handle<? extends AbstractPooledDerivedByteBuf> recyclerHandle) {
super(0);
this.recyclerHandle = (Handle<AbstractPooledDerivedByteBuf>) recyclerHandle;
this.recyclerHandle = (EnhancedHandle<AbstractPooledDerivedByteBuf>) recyclerHandle;
}

// Called from within SimpleLeakAwareByteBuf and AdvancedLeakAwareByteBuf.
Expand Down Expand Up @@ -82,7 +83,7 @@ protected final void deallocate() {
// otherwise it is possible that the same AbstractPooledDerivedByteBuf is again obtained and init(...) is
// called before we actually have a chance to call release(). This leads to call release() on the wrong parent.
ByteBuf parent = this.parent;
recyclerHandle.recycle(this);
recyclerHandle.unguardedRecycle(this);
parent.release();
}

Expand Down
13 changes: 7 additions & 6 deletions buffer/src/main/java/io/netty/buffer/ByteBufUtil.java
Expand Up @@ -19,6 +19,7 @@
import io.netty.util.ByteProcessor;
import io.netty.util.CharsetUtil;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.Recycler.EnhancedHandle;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.ObjectPool;
Expand Down Expand Up @@ -1632,11 +1633,11 @@ static ThreadLocalUnsafeDirectByteBuf newInstance() {
return buf;
}

private final Handle<ThreadLocalUnsafeDirectByteBuf> handle;
private final EnhancedHandle<ThreadLocalUnsafeDirectByteBuf> handle;

private ThreadLocalUnsafeDirectByteBuf(Handle<ThreadLocalUnsafeDirectByteBuf> handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
this.handle = (EnhancedHandle<ThreadLocalUnsafeDirectByteBuf>) handle;
}

@Override
Expand All @@ -1645,7 +1646,7 @@ protected void deallocate() {
super.deallocate();
} else {
clear();
handle.recycle(this);
handle.unguardedRecycle(this);
}
}
}
Expand All @@ -1666,11 +1667,11 @@ static ThreadLocalDirectByteBuf newInstance() {
return buf;
}

private final Handle<ThreadLocalDirectByteBuf> handle;
private final EnhancedHandle<ThreadLocalDirectByteBuf> handle;

private ThreadLocalDirectByteBuf(Handle<ThreadLocalDirectByteBuf> handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
this.handle = (EnhancedHandle<ThreadLocalDirectByteBuf>) handle;
}

@Override
Expand All @@ -1679,7 +1680,7 @@ protected void deallocate() {
super.deallocate();
} else {
clear();
handle.recycle(this);
handle.unguardedRecycle(this);
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions buffer/src/main/java/io/netty/buffer/PoolThreadCache.java
Expand Up @@ -20,6 +20,7 @@
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;

import io.netty.buffer.PoolArena.SizeClass;
import io.netty.util.Recycler.EnhancedHandle;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.ObjectPool;
import io.netty.util.internal.ObjectPool.Handle;
Expand Down Expand Up @@ -377,7 +378,7 @@ public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
boolean queued = queue.offer(entry);
if (!queued) {
// If it was not possible to cache the chunk, immediately recycle the entry
entry.recycle();
entry.unguardedRecycle();
}

return queued;
Expand All @@ -392,7 +393,7 @@ public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity, PoolThreadC
return false;
}
initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity, threadCache);
entry.recycle();
entry.unguardedRecycle();

// allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;
Expand Down Expand Up @@ -451,14 +452,14 @@ private void freeEntry(Entry entry, boolean finalizer) {
}

static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
final EnhancedHandle<Entry<?>> recyclerHandle;
PoolChunk<T> chunk;
ByteBuffer nioBuffer;
long handle = -1;
int normCapacity;

Entry(Handle<Entry<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
this.recyclerHandle = (EnhancedHandle<Entry<?>>) recyclerHandle;
}

void recycle() {
Expand All @@ -467,6 +468,13 @@ void recycle() {
handle = -1;
recyclerHandle.recycle(this);
}

void unguardedRecycle() {
chunk = null;
nioBuffer = null;
handle = -1;
recyclerHandle.unguardedRecycle(this);
}
}

@SuppressWarnings("rawtypes")
Expand Down
11 changes: 4 additions & 7 deletions buffer/src/main/java/io/netty/buffer/PooledByteBuf.java
Expand Up @@ -16,6 +16,7 @@

package io.netty.buffer;

import io.netty.util.Recycler.EnhancedHandle;
import io.netty.util.internal.ObjectPool.Handle;

import java.io.IOException;
Expand All @@ -28,7 +29,7 @@

abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {

private final Handle<PooledByteBuf<T>> recyclerHandle;
private final EnhancedHandle<PooledByteBuf<T>> recyclerHandle;

protected PoolChunk<T> chunk;
protected long handle;
Expand All @@ -43,7 +44,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
@SuppressWarnings("unchecked")
protected PooledByteBuf(Handle<? extends PooledByteBuf<T>> recyclerHandle, int maxCapacity) {
super(maxCapacity);
this.recyclerHandle = (Handle<PooledByteBuf<T>>) recyclerHandle;
this.recyclerHandle = (EnhancedHandle<PooledByteBuf<T>>) recyclerHandle;
}

void init(PoolChunk<T> chunk, ByteBuffer nioBuffer,
Expand Down Expand Up @@ -177,14 +178,10 @@ protected final void deallocate() {
tmpNioBuf = null;
chunk = null;
cache = null;
recycle();
this.recyclerHandle.unguardedRecycle(this);
}
}

private void recycle() {
recyclerHandle.recycle(this);
}

protected final int idx(int index) {
return offset + index;
}
Expand Down
47 changes: 41 additions & 6 deletions common/src/main/java/io/netty/util/Recycler.java
Expand Up @@ -20,6 +20,7 @@
import io.netty.util.internal.ObjectPool;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.jctools.queues.MessagePassingQueue;
Expand All @@ -40,12 +41,17 @@
*/
public abstract class Recycler<T> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
private static final Handle<?> NOOP_HANDLE = new Handle<Object>() {
private static final EnhancedHandle<?> NOOP_HANDLE = new EnhancedHandle<Object>() {
@Override
public void recycle(Object object) {
// NOOP
}

@Override
public void unguardedRecycle(final Object object) {
// NOOP
}

@Override
public String toString() {
return "NOOP_HANDLE";
Expand Down Expand Up @@ -216,7 +222,16 @@ final int threadLocalSize() {
@SuppressWarnings("ClassNameSameAsAncestorName") // Can't change this due to compatibility.
public interface Handle<T> extends ObjectPool.Handle<T> { }

private static final class DefaultHandle<T> implements Handle<T> {
@UnstableApi
public abstract static class EnhancedHandle<T> implements Handle<T> {

public abstract void unguardedRecycle(Object object);

private EnhancedHandle() {
}
}

private static final class DefaultHandle<T> extends EnhancedHandle<T> {
private static final int STATE_CLAIMED = 0;
private static final int STATE_AVAILABLE = 1;
private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
Expand All @@ -239,7 +254,15 @@ public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
localPool.release(this);
localPool.release(this, true);
}

@Override
public void unguardedRecycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
localPool.release(this, false);
}

T get() {
Expand All @@ -252,7 +275,7 @@ void set(T value) {

void toClaimed() {
assert state == STATE_AVAILABLE;
state = STATE_CLAIMED;
STATE_UPDATER.lazySet(this, STATE_CLAIMED);
}

void toAvailable() {
Expand All @@ -261,6 +284,14 @@ void toAvailable() {
throw new IllegalStateException("Object has been recycled already.");
}
}

void unguardedToAvailable() {
int prev = state;
if (prev == STATE_AVAILABLE) {
throw new IllegalStateException("Object has been recycled already.");
}
STATE_UPDATER.lazySet(this, STATE_AVAILABLE);
}
}

private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
Expand Down Expand Up @@ -301,8 +332,12 @@ DefaultHandle<T> claim() {
return handle;
}

void release(DefaultHandle<T> handle) {
handle.toAvailable();
void release(DefaultHandle<T> handle, boolean guarded) {
if (guarded) {
handle.toAvailable();
} else {
handle.unguardedToAvailable();
}
Thread owner = this.owner;
if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) {
accept(handle);
Expand Down
Expand Up @@ -112,7 +112,8 @@ public final void setRefCnt(T instance, int refCnt) {
* Resets the reference count to 1
*/
public final void resetRefCnt(T instance) {
updater().set(instance, initialValue());
// no need of a volatile set, it should happen in a quiescent state
updater().lazySet(instance, initialValue());
}

public final T retain(T instance) {
Expand Down

0 comments on commit c353f4f

Please sign in to comment.