Skip to content

Commit

Permalink
Make derived buffers recyclable / Add missing overrides to ByteBuf impls
Browse files Browse the repository at this point in the history
Related: #4333 #4421

Motivation:

slice(), duplicate() and readSlice() currently create a non-recyclable
derived buffer instance. Under heavy load, an application that creates a
lot of derived buffers can put the garbage collector under pressure.

Modifications:

- Deprecate the old derived buffer implementations
- Add the new recyclable derived buffer implementations, which has its
  own reference count value
  - When a derived buffer is created, its internal reference count is 0.
  - When retain() is called on a derived buffer, the internal reference
    count becomes a positive value and the original buffer's retain() is
    called.
  - When release() is called on a derived buffer, the internal reference
    count is decreased first, and then the original buffer's release()
    is called when the internal reference count is 0.
- Add ByteBufUtil.duplicate/slice() so that a user can easily implement
  ByteBuf.duplicate/slice()
- Add missing overrides in some ByteBuf impls
- Fix incorrect range checks in SlicedByteBuf
- Miscellaneous:
  - Merge Duplicated/SlicedAbstractByteBuf.unwrap0() into unwrap() using
    covariant return type

Result:

- Derived buffers are now recycled when retained and released, although
  they are not recycled if a user called release() against the original
  buffer.

    buf.slice().retain().release(); // recycled
    buf.slice().retain(); buf.release(); // not recycled

- Correct range checks in SlicedByteBuf
  • Loading branch information
trustin committed Apr 13, 2016
1 parent de2515d commit 46e3ce7
Show file tree
Hide file tree
Showing 67 changed files with 2,569 additions and 177 deletions.
30 changes: 26 additions & 4 deletions buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java
@@ -1,5 +1,5 @@
/*
* Copyright 2012 The Netty Project
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand Down Expand Up @@ -799,6 +799,13 @@ public ByteBuf readSlice(int length) {
return slice;
}

@Override
public ByteBuf readSliceRetained(int length) {
ByteBuf slice = sliceRetained(readerIndex, length);
readerIndex += length;
return slice;
}

@Override
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
checkReadableBytes(length);
Expand Down Expand Up @@ -1112,17 +1119,32 @@ public ByteBuf copy() {

@Override
public ByteBuf duplicate() {
return new DuplicatedAbstractByteBuf(this);
return ByteBufUtil.duplicate(this);
}

@Override
public ByteBuf duplicateRetained() {
return duplicate().retain();
}

@Override
public ByteBuf slice() {
return slice(readerIndex, readableBytes());
return ByteBufUtil.slice(this);
}

@Override
public ByteBuf sliceRetained() {
return slice().retain();
}

@Override
public ByteBuf slice(int index, int length) {
return new SlicedAbstractByteBuf(this, index, length);
return ByteBufUtil.slice(this, index, length);
}

@Override
public ByteBuf sliceRetained(int index, int length) {
return slice(index, length).retain();
}

@Override
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2012 The Netty Project
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2013 The Netty Project
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -21,7 +21,10 @@
/**
* Abstract base class for {@link ByteBuf} implementations that wrap another
* {@link ByteBuf}.
*
* @deprecated Do not use.
*/
@Deprecated
public abstract class AbstractDerivedByteBuf extends AbstractByteBuf {

protected AbstractDerivedByteBuf(int maxCapacity) {
Expand Down
217 changes: 217 additions & 0 deletions buffer/src/main/java/io/netty/buffer/AbstractPooledDerivedByteBuf.java
@@ -0,0 +1,217 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.netty.buffer;

import io.netty.util.IllegalReferenceCountException;
import io.netty.util.Recycler.Handle;
import io.netty.util.internal.PlatformDependent;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
* Abstract base class for derived {@link ByteBuf} implementations.
*/
abstract class AbstractPooledDerivedByteBuf<T> extends AbstractByteBuf {

@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<AbstractPooledDerivedByteBuf> refCntUpdater;

static {
@SuppressWarnings("rawtypes")
AtomicIntegerFieldUpdater<AbstractPooledDerivedByteBuf> updater =
PlatformDependent.newAtomicIntegerFieldUpdater(AbstractPooledDerivedByteBuf.class, "refCnt");
if (updater == null) {
updater = AtomicIntegerFieldUpdater.newUpdater(AbstractPooledDerivedByteBuf.class, "refCnt");
}
refCntUpdater = updater;
}

private final Handle<AbstractPooledDerivedByteBuf<T>> recyclerHandle;
private AbstractByteBuf buffer;
private volatile int refCnt;

@SuppressWarnings("unchecked")
AbstractPooledDerivedByteBuf(Handle<? extends AbstractPooledDerivedByteBuf<T>> recyclerHandle) {
super(0);
this.recyclerHandle = (Handle<AbstractPooledDerivedByteBuf<T>>) recyclerHandle;
}

@Override
public final AbstractByteBuf unwrap() {
return buffer;
}

final <U extends AbstractPooledDerivedByteBuf<T>> U init(
AbstractByteBuf buffer, int readerIndex, int writerIndex, int maxCapacity) {

this.buffer = buffer;
maxCapacity(maxCapacity);
refCnt = 0;

setIndex(readerIndex, writerIndex);

@SuppressWarnings("unchecked")
final U castThis = (U) this;

return castThis;
}

@Override
public final int refCnt() {
return unwrap().refCnt();
}

@Override
public final ByteBuf retain() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == Integer.MAX_VALUE) {
throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
if (refCnt == 0) {
unwrap().retain();
}
break;
}
}
return this;
}

@Override
public final ByteBuf retain(int increment) {
if (increment <= 0) {
throw new IllegalArgumentException("increment: " + increment + " (expected: > 0)");
}

for (;;) {
int refCnt = this.refCnt;
if (refCnt > Integer.MAX_VALUE - increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt + increment)) {
if (refCnt == 0) {
unwrap().retain();
}
break;
}
}
return this;
}

@Override
public final ByteBuf touch() {
return this;
}

@Override
public final ByteBuf touch(Object hint) {
return this;
}

@Override
public final boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
return unwrap().release();
}

if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
recyclerHandle.recycle(this);
return unwrap().release();
} else {
return unwrap().refCnt() == 0;
}
}
}
}

@Override
public final boolean release(int decrement) {
if (decrement <= 0) {
throw new IllegalArgumentException("decrement: " + decrement + " (expected: > 0)");
}

for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
return unwrap().release(decrement);
}

if (refCnt >= decrement) {
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
recyclerHandle.recycle(this);
return unwrap().release();
} else {
return unwrap().refCnt() == 0;
}
}
} else { // refCnt < decrement
if (refCntUpdater.compareAndSet(this, refCnt, 0)) {
recyclerHandle.recycle(this);
return unwrap().release(decrement - refCnt + 1);
}
}
}
}

@Override
public final ByteBufAllocator alloc() {
return buffer.alloc();
}

@Override
@Deprecated
public final ByteOrder order() {
return buffer.order();
}

@Override
public final boolean isDirect() {
return buffer.isDirect();
}

@Override
public boolean hasArray() {
return buffer.hasArray();
}

@Override
public byte[] array() {
return buffer.array();
}

@Override
public boolean hasMemoryAddress() {
return buffer.hasMemoryAddress();
}

@Override
public final int nioBufferCount() {
return buffer.nioBufferCount();
}

@Override
public final ByteBuffer internalNioBuffer(int index, int length) {
return nioBuffer(index, length);
}
}
@@ -1,5 +1,5 @@
/*
* Copyright 2013 The Netty Project
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2015 The Netty Project
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -24,6 +24,7 @@
/**
* Special {@link SwappedByteBuf} for {@link ByteBuf}s that is using unsafe.
*/
@Deprecated
abstract class AbstractUnsafeSwappedByteBuf extends SwappedByteBuf {
private final boolean nativeByteOrder;
private final AbstractByteBuf wrapped;
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2013 The Netty Project
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand Down Expand Up @@ -76,24 +76,48 @@ public ByteBuf slice() {
return new AdvancedLeakAwareByteBuf(super.slice(), leak);
}

@Override
public ByteBuf sliceRetained() {
recordLeakNonRefCountingOperation(leak);
return new AdvancedLeakAwareByteBuf(super.sliceRetained(), leak);
}

@Override
public ByteBuf slice(int index, int length) {
recordLeakNonRefCountingOperation(leak);
return new AdvancedLeakAwareByteBuf(super.slice(index, length), leak);
}

@Override
public ByteBuf sliceRetained(int index, int length) {
recordLeakNonRefCountingOperation(leak);
return new AdvancedLeakAwareByteBuf(super.sliceRetained(index, length), leak);
}

@Override
public ByteBuf duplicate() {
recordLeakNonRefCountingOperation(leak);
return new AdvancedLeakAwareByteBuf(super.duplicate(), leak);
}

@Override
public ByteBuf duplicateRetained() {
recordLeakNonRefCountingOperation(leak);
return new AdvancedLeakAwareByteBuf(super.duplicateRetained(), leak);
}

@Override
public ByteBuf readSlice(int length) {
recordLeakNonRefCountingOperation(leak);
return new AdvancedLeakAwareByteBuf(super.readSlice(length), leak);
}

@Override
public ByteBuf readSliceRetained(int length) {
recordLeakNonRefCountingOperation(leak);
return new AdvancedLeakAwareByteBuf(super.readSliceRetained(length), leak);
}

@Override
public ByteBuf discardReadBytes() {
recordLeakNonRefCountingOperation(leak);
Expand Down

0 comments on commit 46e3ce7

Please sign in to comment.