Skip to content

Commit

Permalink
retained[Slice|Duplicate] buffer reference count bug
Browse files Browse the repository at this point in the history
Motivation:
Currently the ByteBuf created as a result of retained[Slice|Duplicate] maintains its own reference count, and when this reference count is depleated it will release the ByteBuf returned from unwrap(). The unwrap() buffer is designed to be the 'root parent' and will skip all intermediate layers of buffers. If the intermediate layers of buffers contain a retained[Slice|Duplicate] then these reference counts will be ignored during deallocation. This may lead to deallocating the 'root parent' before all derived pooled buffers are actually released. This same issue holds if a retained[Slice|Duplicate] is in the heirachy and a 'regular' slice() or duplicate() buffer is created.

Modifications:
- AbstractPooledDerivedByteBuf must maintain a reference to the direct parent (the buffer which retained[Slice|Duplicate] was called on) and release on this buffer instead of the 'root parent' returned by unwrap()
- slice() and duplicate() buffers created from AbstractPooledDerivedByteBuf must also delegate reference count operations to their immediate parent (or first ancestor which maintains an independent reference count).

Result:
Fixes netty#5999
  • Loading branch information
Scottmitch authored and liuzhengyang committed Sep 10, 2017
1 parent fe03759 commit af2e77f
Show file tree
Hide file tree
Showing 9 changed files with 674 additions and 33 deletions.
28 changes: 28 additions & 0 deletions buffer/src/main/java/io/netty/buffer/AbstractDerivedByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,59 @@ protected AbstractDerivedByteBuf(int maxCapacity) {

@Override
public final int refCnt() {
return refCnt0();
}

int refCnt0() {
return unwrap().refCnt();
}

@Override
public final ByteBuf retain() {
return retain0();
}

ByteBuf retain0() {
unwrap().retain();
return this;
}

@Override
public final ByteBuf retain(int increment) {
return retain0(increment);
}

ByteBuf retain0(int increment) {
unwrap().retain(increment);
return this;
}

@Override
public final ByteBuf touch() {
return touch0();
}

ByteBuf touch0() {
unwrap().touch();
return this;
}

@Override
public final ByteBuf touch(Object hint) {
return touch0(hint);
}

ByteBuf touch0(Object hint) {
unwrap().touch(hint);
return this;
}

@Override
public final boolean release() {
return release0();
}

boolean release0() {
return unwrap().release();
}

Expand All @@ -70,6 +94,10 @@ public final boolean release(int decrement) {
return unwrap().release(decrement);
}

boolean release0(int decrement) {
return unwrap().release(decrement);
}

@Override
public boolean isReadOnly() {
return unwrap().isReadOnly();
Expand Down
193 changes: 183 additions & 10 deletions buffer/src/main/java/io/netty/buffer/AbstractPooledDerivedByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.netty.buffer;

import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCounted;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand All @@ -27,7 +28,14 @@
abstract class AbstractPooledDerivedByteBuf extends AbstractReferenceCountedByteBuf {

private final Handle<AbstractPooledDerivedByteBuf> recyclerHandle;
private AbstractByteBuf buffer;
private AbstractByteBuf rootParent;
/**
* Deallocations of a pooled derived buffer should always propagate through the entire chain of derived buffers.
* This is because each pooled derived buffer maintains its own reference count and we should respect each one.
* If deallocations cause a release of the "root parent" then then we may prematurely release the underlying
* content before all the derived buffers have been released.
*/
private ByteBuf parent;

@SuppressWarnings("unchecked")
AbstractPooledDerivedByteBuf(Handle<? extends AbstractPooledDerivedByteBuf> recyclerHandle) {
Expand All @@ -37,14 +45,14 @@ abstract class AbstractPooledDerivedByteBuf extends AbstractReferenceCountedByte

@Override
public final AbstractByteBuf unwrap() {
return buffer;
return rootParent;
}

final <U extends AbstractPooledDerivedByteBuf> U init(
AbstractByteBuf unwrapped, ByteBuf wrapped, int readerIndex, int writerIndex, int maxCapacity) {

wrapped.retain(); // Retain up front to ensure the wrapped buffer is accessible before doing more work.
this.buffer = unwrapped;
wrapped.retain(); // Retain up front to ensure the parent is accessible before doing more work.
parent = wrapped;
rootParent = unwrapped;

try {
maxCapacity(maxCapacity);
Expand All @@ -57,20 +65,20 @@ final <U extends AbstractPooledDerivedByteBuf> U init(
return castThis;
} finally {
if (wrapped != null) {
this.buffer = null;
parent = rootParent = null;
wrapped.release();
}
}
}

@Override
protected final void deallocate() {
// We need to first store a reference to the wrapped buffer before recycle this instance. This is needed as
// We need to first store a reference to the parent before recycle this instance. This is needed as
// otherwise it is possible that the same AbstractPooledDerivedByteBuf is again obtained and init(...) is
// called before we actually have a chance to call release(). This leads to call release() on the wrong buffer.
ByteBuf wrapped = unwrap();
// called before we actually have a chance to call release(). This leads to call release() on the wrong parent.
ByteBuf parent = this.parent;
recyclerHandle.recycle(this);
wrapped.release();
parent.release();
}

@Override
Expand Down Expand Up @@ -124,4 +132,169 @@ public final ByteBuf retainedSlice() {
final int index = readerIndex();
return retainedSlice(index, writerIndex() - index);
}

@Override
public ByteBuf slice(int index, int length) {
// All reference count methods should be inherited from this object (this is the "parent").
return new PooledNonRetainedSlicedByteBuf(this, unwrap(), index, length);
}

final ByteBuf duplicate0() {
// All reference count methods should be inherited from this object (this is the "parent").
return new PooledNonRetainedDuplicateByteBuf(this, unwrap());
}

private static final class PooledNonRetainedDuplicateByteBuf extends UnpooledDuplicatedByteBuf {
private final ReferenceCounted referenceCountDelegate;

PooledNonRetainedDuplicateByteBuf(ReferenceCounted referenceCountDelegate, AbstractByteBuf buffer) {
super(buffer);
this.referenceCountDelegate = referenceCountDelegate;
}

@Override
int refCnt0() {
return referenceCountDelegate.refCnt();
}

@Override
ByteBuf retain0() {
referenceCountDelegate.retain();
return this;
}

@Override
ByteBuf retain0(int increment) {
referenceCountDelegate.retain(increment);
return this;
}

@Override
ByteBuf touch0() {
referenceCountDelegate.touch();
return this;
}

@Override
ByteBuf touch0(Object hint) {
referenceCountDelegate.touch(hint);
return this;
}

@Override
boolean release0() {
return referenceCountDelegate.release();
}

@Override
boolean release0(int decrement) {
return referenceCountDelegate.release(decrement);
}

@Override
public ByteBuf duplicate() {
return new PooledNonRetainedDuplicateByteBuf(referenceCountDelegate, this);
}

@Override
public ByteBuf retainedDuplicate() {
return PooledDuplicatedByteBuf.newInstance(unwrap(), this, readerIndex(), writerIndex());
}

@Override
public ByteBuf slice(int index, int length) {
checkIndex0(index, length);
return new PooledNonRetainedSlicedByteBuf(referenceCountDelegate, unwrap(), index, length);
}

@Override
public ByteBuf retainedSlice() {
// Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
return retainedSlice(readerIndex(), capacity());
}

@Override
public ByteBuf retainedSlice(int index, int length) {
return PooledSlicedByteBuf.newInstance(unwrap(), this, index, length);
}
}

private static final class PooledNonRetainedSlicedByteBuf extends UnpooledSlicedByteBuf {
private final ReferenceCounted referenceCountDelegate;

PooledNonRetainedSlicedByteBuf(ReferenceCounted referenceCountDelegate,
AbstractByteBuf buffer, int index, int length) {
super(buffer, index, length);
this.referenceCountDelegate = referenceCountDelegate;
}

@Override
int refCnt0() {
return referenceCountDelegate.refCnt();
}

@Override
ByteBuf retain0() {
referenceCountDelegate.retain();
return this;
}

@Override
ByteBuf retain0(int increment) {
referenceCountDelegate.retain(increment);
return this;
}

@Override
ByteBuf touch0() {
referenceCountDelegate.touch();
return this;
}

@Override
ByteBuf touch0(Object hint) {
referenceCountDelegate.touch(hint);
return this;
}

@Override
boolean release0() {
return referenceCountDelegate.release();
}

@Override
boolean release0(int decrement) {
return referenceCountDelegate.release(decrement);
}

@Override
public ByteBuf duplicate() {
// Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
final ByteBuf duplicate = slice(0, capacity());
duplicate.setIndex(readerIndex(), writerIndex());
return duplicate;
}

@Override
public ByteBuf retainedDuplicate() {
return PooledDuplicatedByteBuf.newInstance(unwrap(), this, readerIndex(), writerIndex());
}

@Override
public ByteBuf slice(int index, int length) {
checkIndex0(index, length);
return new PooledNonRetainedSlicedByteBuf(referenceCountDelegate, unwrap(), idx(index), length);
}

@Override
public ByteBuf retainedSlice() {
// Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
return retainedSlice(0, capacity());
}

@Override
public ByteBuf retainedSlice(int index, int length) {
return PooledSlicedByteBuf.newInstance(unwrap(), this, idx(index), length);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected PooledDuplicatedByteBuf newObject(Handle<PooledDuplicatedByteBuf> hand
static PooledDuplicatedByteBuf newInstance(AbstractByteBuf unwrapped, ByteBuf wrapped,
int readerIndex, int writerIndex) {
final PooledDuplicatedByteBuf duplicate = RECYCLER.get();
duplicate.init(unwrapped, wrapped, readerIndex, writerIndex, wrapped.maxCapacity());
duplicate.init(unwrapped, wrapped, readerIndex, writerIndex, unwrapped.maxCapacity());
duplicate.markReaderIndex();
duplicate.markWriterIndex();

Expand Down Expand Up @@ -88,13 +88,15 @@ public ByteBuf copy(int index, int length) {
}

@Override
public ByteBuf slice(int index, int length) {
return unwrap().slice(index, length);
public ByteBuf retainedSlice(int index, int length) {
return PooledSlicedByteBuf.newInstance(unwrap(), this, index, length);
}

@Override
public ByteBuf retainedSlice(int index, int length) {
return PooledSlicedByteBuf.newInstance(unwrap(), this, index, length);
public ByteBuf duplicate() {
ByteBuf duplicate = duplicate0();
duplicate.setIndex(readerIndex(), writerIndex());
return duplicate;
}

@Override
Expand Down
11 changes: 4 additions & 7 deletions buffer/src/main/java/io/netty/buffer/PooledSlicedByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public ByteBuf copy(int index, int length) {
@Override
public ByteBuf slice(int index, int length) {
checkIndex0(index, length);
return unwrap().slice(idx(index), length);
return super.slice(idx(index), length);
}

@Override
Expand All @@ -113,18 +113,15 @@ public ByteBuf retainedSlice(int index, int length) {

@Override
public ByteBuf duplicate() {
// Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
final ByteBuf duplicate = unwrap().slice(adjustment, capacity());
duplicate.setIndex(readerIndex(), writerIndex());
ByteBuf duplicate = duplicate0();
duplicate.setIndex(idx(readerIndex()), adjustment + capacity());
return duplicate;
}

@Override
public ByteBuf retainedDuplicate() {
// Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
final ByteBuf duplicate = retainedSlice(0, capacity());
duplicate.setIndex(readerIndex(), writerIndex());
return duplicate;
return PooledDuplicatedByteBuf.newInstance(unwrap(), this, idx(readerIndex()), adjustment + capacity());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* {@link DuplicatedByteBuf} implementation that can do optimizations because it knows the duplicated buffer
* is of type {@link AbstractByteBuf}.
*/
final class UnpooledDuplicatedByteBuf extends DuplicatedByteBuf {
class UnpooledDuplicatedByteBuf extends DuplicatedByteBuf {
UnpooledDuplicatedByteBuf(AbstractByteBuf buffer) {
super(buffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
* A special {@link AbstractUnpooledSlicedByteBuf} that can make optimizations because it knows the sliced buffer is of
* type {@link AbstractByteBuf}.
*/
final class UnpooledSlicedByteBuf extends AbstractUnpooledSlicedByteBuf {

class UnpooledSlicedByteBuf extends AbstractUnpooledSlicedByteBuf {
UnpooledSlicedByteBuf(AbstractByteBuf buffer, int index, int length) {
super(buffer, index, length);
}
Expand Down
Loading

0 comments on commit af2e77f

Please sign in to comment.