Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ByteToMessageDecoder avoid using discardSomeReadBytes() #9850

Closed
wants to merge 1 commit into from

Conversation

Scottmitch
Copy link
Member

Motivation:
ByteToMessageDecoder maintains a ByteBuf to aggregate data in the event
a full message requires multiple socket read calls (aka the cumulator).
The cumulator buffer may grow over time and so ByteToMessageDecoder
periodically calls discardSomeReadBytes() in an attempt to reclaim
unused space and compact the buffer. Calling discardSomeReadBytes()
will modify the underlying buffer and if the application has any
views into the buffer (e.g. slice().retain()) their view would be
corrupted, and therefore the refCnt() must be 1 in order for
discardSomeReadBytes() to be called on the buffer. However if an
application habitually processes the data on another thread this
strategy may mean that discardSomeReadBytes() cannot be called in a
timely manner due to the reference count not being 1 and the buffer may
grow relatively large.

Modifications:

  • Instead of basing the buffer space reclamation on if the buffer is
    "shared", base the criteria on how much "wasted" space exists in the
    accumulation buffer. The default policy is if the buffer is at least
    half "wasted" space based upon its original capacity then reclaim
    space.
  • Instead of using ByteBuf#discardSomeReadBytes() to reclaim space,
    allocate a new buffer and swap the old buffer. This avoids any risk of
    existing views on the data being corrupted.
  • Trigger points are added (but made private) such that base classes
    can maintain indexes on the cumulation data and reset these indexes
    when the underlying cumulation buffer changes. This can be useful to
    avoid re-parsing data if partial data is parsed before it is all
    received. These methods can be made public later if/when they are used.

Result:
ByteToMessageDecoder reclaims cumulation buffer space according to
wasted space instead of if the buffer is shared.

@normanmaurer
Copy link
Member

@Scottmitch there was a leak reported:

07:29:35.901 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
        io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:592)
        io.netty.handler.codec.ByteToMessageDecoder.swapCumulation(ByteToMessageDecoder.java:393)
        io.netty.handler.codec.ByteToMessageDecoder.tryDiscardSomeReadBytes(ByteToMessageDecoder.java:373)
        io.netty.handler.codec.ByteToMessageDecoder.channelReadComplete(ByteToMessageDecoder.java:329)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:408)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:395)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelReadComplete(AbstractChannelHandlerContext.java:388)
        io.netty.channel.DefaultChannelPipeline$HeadContext.channelReadComplete(DefaultChannelPipeline.java:1427)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:408)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:395)
        io.netty.channel.DefaultChannelPipeline.fireChannelReadComplete(DefaultChannelPipeline.java:937)
        io.netty.channel.embedded.EmbeddedChannel.flushInbound(EmbeddedChannel.java:385)
        io.netty.channel.embedded.EmbeddedChannel.writeInbound(EmbeddedChannel.java:346)
        io.netty.handler.codec.compression.SnappyFrameDecoderTest.testInvalidChecksumThrowsException(SnappyFrameDecoderTest.java:152)
        java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        java.base/java.lang.reflect.Method.invoke(Method.java:566)
        org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:19)
        org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
        org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
        org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
        org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
        org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
        org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
        org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
        org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
Created at:
        io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349)
        io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
        io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
        io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
        io.netty.handler.codec.ByteToMessageDecoder.allocateNewByteBuf(ByteToMessageDecoder.java:415)
        io.netty.handler.codec.ByteToMessageDecoder.swapCumulation(ByteToMessageDecoder.java:391)
        io.netty.handler.codec.ByteToMessageDecoder.tryDiscardSomeReadBytes(ByteToMessageDecoder.java:373)
        io.netty.handler.codec.ByteToMessageDecoder.channelReadComplete(ByteToMessageDecoder.java:329)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:408)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:395)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelReadComplete(AbstractChannelHandlerContext.java:388)
        io.netty.channel.DefaultChannelPipeline$HeadContext.channelReadComplete(DefaultChannelPipeline.java:1427)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:408)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:395)
        io.netty.channel.DefaultChannelPipeline.fireChannelReadComplete(DefaultChannelPipeline.java:937)
        io.netty.channel.embedded.EmbeddedChannel.flushInbound(EmbeddedChannel.java:385)
        io.netty.channel.embedded.EmbeddedChannel.writeInbound(EmbeddedChannel.java:346)
        io.netty.handler.codec.compression.SnappyFrameDecoderTest.testInvalidChecksumThrowsException(SnappyFrameDecoderTest.java:152)
        java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        java.base/java.lang.reflect.Method.invoke(Method.java:566)
        org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:19)
        org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
        org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
        org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
        org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
        org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
        org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
        org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
        org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
[INFO] Tests run: 14, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.324 s - in io.netty.handler.codec.compression.SnappyIntegrationTest

Copy link
Member

@normanmaurer normanmaurer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM... only one comment which I think is also the reason for the leak that was reported (which is mainly caused by an error in the SnappFrameDecoderTest but did not show up before because we did not allocate a new buffer if there is nothing left to read).

// of this cumulation, and if we use discardSomeReadBytes the user code will see corrupted data. If we attempt
// to check the ReferenceCount == 1 to see if discardSomeReadBytes is safe to call this may result in the
// cumulation buffer growing very large in the case the asynchronous access pattern is persistent.
if (cumulation != null && cumulation.readerIndex() >= cumulation.capacity() >>> 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Scottmitch should we just set cumulation to null if cumulation.isReadable() == false ? I think there is no need to do allocate anything in this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is checked in channelRead() and the behavior would be different than what was done in discardSomeReadBytes so I'm not convinced it is necessary here. This may benefit calls from channelReadComplete() but presumably the channel read finally block should catch the condition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think it can’t harm

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed it won't harm but it is redundant (unless I'm missing something). channelReadComplete() generally doesn't consume data so I'm assuming why it didn't previously need to account for !cumulation.isReadable() (because this condition would be caught in the prior channelRead() call, see below). I updated the java doc to clarify the method may null out the cumulation to be clear it is allowed, but can you clarify which use case you had in mind where we would not clear/reset the cumulation?

// channelRead() code:
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    releaseCumulation(); //-> release the cumulation here if it has been fully drained
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    tryDiscardSomeReadBytes(ctx.alloc());
                }

@njhill
Copy link
Member

njhill commented Dec 6, 2019

Hey thanks @Scottmitch. I have coincidentally been looking at this area a lot lately and am iterating on some possibly-related streamlining (taken longer than anticipated). So I have quite a few questions/comments already, but probably still need to digest this proposal a bit more.

For now though I'm struggling a bit to understand the premise. In cases that the cumulation buffer is being referenced/used elsewhere (e.g. retained slices passed to other threads in prior decode calls), discarding does not even come into play. The in-place discarding in its current form is there just to prevent uncontrolled growth/waste of the cumulation buffer in cases where references to it aren't retained. It doesn't apply to the shared cases since when new data comes in, remaining data will be copied to the front of a newly allocated buffer (and thus "naturally" discarding) before releasing the old one. What am I missing? :)

normanmaurer added a commit that referenced this pull request Dec 6, 2019
…DecoderTest`

Motivation:

We did not correctly close the `EmbeddedChannel` which would lead to not have `handlerRemoved(...)` called. This can lead to leaks. Beside this we also did not correctly consume produced data which could also show up as a leak.

Modifications:

- Always call `EmbeddedChannel.finish()`
- Ensure we consume all produced data and release it

Result:

No more leaks in test. This showed up in #9850 (comment).
@normanmaurer
Copy link
Member

Leak itself should be fixed by #9851

normanmaurer added a commit that referenced this pull request Dec 6, 2019
…DecoderTest` (#9851)

Motivation:

We did not correctly close the `EmbeddedChannel` which would lead to not have `handlerRemoved(...)` called. This can lead to leaks. Beside this we also did not correctly consume produced data which could also show up as a leak.

Modifications:

- Always call `EmbeddedChannel.finish()`
- Ensure we consume all produced data and release it

Result:

No more leaks in test. This showed up in #9850 (comment).
normanmaurer added a commit that referenced this pull request Dec 6, 2019
…DecoderTest` (#9851)

Motivation:

We did not correctly close the `EmbeddedChannel` which would lead to not have `handlerRemoved(...)` called. This can lead to leaks. Beside this we also did not correctly consume produced data which could also show up as a leak.

Modifications:

- Always call `EmbeddedChannel.finish()`
- Ensure we consume all produced data and release it

Result:

No more leaks in test. This showed up in #9850 (comment).
@Scottmitch
Copy link
Member Author

@njhill - Agreed if retain() is called in decode(..) the discard shouldn't occur. The "reclaim in place" approach is nice, but basing it on reference counting has some potential side effects:

  • Async execution and cumulation growth - in the case that the application is processing data on another thread it may result in continuous growth of the cumulation buffer (because the buffer is always retained). The growth will stop if the cumulation becomes not readable, but this depends upon how data is received and the protocol decode logic.
  • Allocators that generate "unreleasable" buffers. Reference counting is hard and some applications may be willing to trade performance for reduced complexity and ignore reference counting. In this case checking reference counts to drive changing the underlying buffer's content/indexes maybe disruptive to the application. One could argue this is abusing the ReferenceCount API (FWIW it is done in UnreleasableByteBuf) but reference counting's primary purpose is to more quickly/deterministically reclaim memory, and inferring if a buffer is shared is overloading the primary purpose of the API.

The question is do we lose much by allocating a new buffer (considering the default allocator is pooled) in the (relatively rare in most supported protocols and only if carefully segmented data delivery) event where compaction is triggered?

when new data comes in, remaining data will be copied to the front of a newly allocated buffer (and thus "naturally" discarding) before releasing the old one

When new data comes in, if there is data in the cumulation, then this new data will be copied to the cumulation (at the position of the writer index). Can you clarify the scenario you had in mind?

@normanmaurer
Copy link
Member

@Scottmitch can you rebase ?

Motivation:
ByteToMessageDecoder maintains a ByteBuf to aggregate data in the event
a full message requires multiple socket read calls (aka the cumulator).
The cumulator buffer may grow over time and so ByteToMessageDecoder
periodically calls discardSomeReadBytes() in an attempt to reclaim
unused space and compact the buffer. Calling discardSomeReadBytes()
will modify the underlying buffer and if the application has any
views into the buffer (e.g. slice().retain()) their view would be
corrupted, and therefore the refCnt() must be 1 in order for
discardSomeReadBytes() to be called on the buffer. However if an
application habitually processes the data on another thread this
strategy may mean that discardSomeReadBytes() cannot be called in a
timely manner due to the reference count not being 1 and the buffer may
grow relatively large.

Modifications:
- Instead of basing the buffer space reclamation on if the buffer is
"shared", base the criteria on how much "wasted" space exists in the
accumulation buffer. The default policy is if the buffer is at least
half "wasted" space based upon its original capacity then reclaim
space.
- Instead of using ByteBuf#discardSomeReadBytes() to reclaim space,
allocate a new buffer and swap the old buffer. This avoids any risk of
existing views on the data being corrupted.
- Trigger points are added (but made private) such that base classes
can maintain indexes on the cumulation data and reset these indexes
when the underlying cumulation buffer changes. This can be useful to
avoid re-parsing data if partial data is parsed before it is all
received. These methods can be made public later if/when they are used.

Result:
ByteToMessageDecoder reclaims cumulation buffer space according to
wasted space instead of if the buffer is shared.
@njhill
Copy link
Member

njhill commented Dec 9, 2019

Thanks @Scottmitch

When new data comes in, if there is data in the cumulation, then this new data will be copied to the cumulation (at the position of the writer index). Can you clarify the scenario you had in mind?

We are talking about the case where refCnt > 1, right? In this case the cumulator will always call expandCumulation which copies to a newly allocated buffer. In other words I’m not sure your first side-effect bullet is valid.

Allocators that generate "unreleasable" buffers. Reference counting is hard and some applications may be willing to trade performance for reduced complexity and ignore reference counting.

I'm not sure that using unreleasable buffers is the best way to achieve this though. As far as I can see, servicetalk has a thread safety exposure related to the above - if the cumulator writes beyond the buffer's current capacity then an internal realloc will be performed which won't be safely published to other threads.

Would it not be simpler to either:

  • Forgo all of the custom UnreleasableXxByteBufs altogether and instead use netty's unpooled ByteBufs as-is, then just ensure that they are always retain()ed prior to passing to any user code (I guess at the point they are wrapped in a servicetalk buffer?)

or

  • Keep them and just add a call to super.retain() to their constructors so that they have a fixed ref count of 2 instead of 1.

The first has the advantage of less custom code and allows for explicit release of (direct) buffers in cases that they aren't passed to user code. The second avoids retain/release volatile accesses. But both fix the mentioned issue and should avoid this need for this or any other change on the netty side. WDYT?

@Scottmitch
Copy link
Member Author

In other words I’m not sure your first side-effect bullet is valid.

In this scenario reference count is always 1 (due to unreleasable), but yes if reference counts vary it depends upon timing of release by the other threads and when this is visible to the EventLoop thread, and yes less likely due to the ref count check in Cumulation implementations.

Forgo all of the custom UnreleasableXxByteBufs … just ensure that they are always retain()ed prior to passing to any user code.

This will not work. If ref counting isn't exposed to users you need to avoid the case where Netty releases the buffers (e.g. when writing). Otherwise you end up getting IllegalRefCountExceptions if you write a buffer and re-use it. So at some level unreleasable is necessary.

Keep them and just add a call to super.retain() to their constructors so that they have a fixed ref count of 2 instead of 1.

Representing the ref cnt as >1 was considered, and maybe a viable option (more investigation required). However we are still taking liberties with the APIs as using the reference count interface to determine “is this buffer shared” does not provide the expected results (e.g. the buffer may or may not be shared). If this was done the functionality should be “OK” in this case AFAICT, but do we need this optimization here and is this a pattern we want to encourage in Netty? Note that Netty’s UnreleasableByteBuf is subject to the same issue.

then an internal realloc will be performed which won't be safely published to other threads.

I'd like to dig into this to understand the scenario you had in mind. IIUC here is the scenario you are describing:

EventLoop Thread (ELT): read data {full frame, half frame}, ByteToMessageDecoder sets cumulator, decodes a frame for the protocol (e.g. h2) via slice/duplicate, half frame is left in cumulation
Application Thread (AT): gets frame, and starts using content
ELT: read data {half frame}, cumulation.writeBytes(data), cumulation needs to be resized to accommodate extra data
AT: <what does the application see?>

The AT has its own slice, so the indexes/capacity are not shared. The underlying data is shared, but can the AT see partial data or a corrupted form of the data during copy? After the data is parsed, the parsed section of data is not modified. The resize operation involves allocation new memory, copying from old memory, and assigning the reference to data to the new memory the AT may see the old memory or the new memory. However the memory is reclaimed by GC (ref counting is not reclaiming memory, thanks for making me investigate in more detail apple/servicetalk#893 :) ), the relative indexes into the buffer from any slices should remain consistent because data isn’t moving (just extending the capacity), the content of the parsed data will not be modified. IIUC the AT should see the same contents in memory either way, can you clarify if I’m missing something?

@njhill
Copy link
Member

njhill commented Dec 11, 2019

In this scenario reference count is always 1 (due to unreleasable)

Right but this scenario involves a specific approach to a specific use-case (namely the particular way custom unreleasable bufs are being used to shield users from ref counting), whereas the motivation in the PR description and explanatory code comments imply the changes benefit standard netty (ref-counting based) usage. As far as I can see the changes are only in support of this niche case, to the possible detriment of other/existing cases, while introducing public API change that must then be supported going forward and might inhibit our ability to make other optimizations here (the class is quite central as you know)

we are still taking liberties with the APIs as using the reference count interface to determine “is this buffer shared” does not provide the expected results (e.g. the buffer may or may not be shared)

I think in this case it's reasonable and semantically consistent to consider the buffer "permanently" shared, or to just think of netty's refCnt > 1 check to mean "possibly shared". That said it would still be my second-choice option.

If this was done the functionality should be “OK” in this case AFAICT, but do we need this optimization here

Which optimization where? If this was done then the need for any change on the netty side (e.g. this PR) goes away

This will not work. If ref counting isn't exposed to users you need to avoid the case where Netty releases the buffers (e.g. when writing). Otherwise you end up getting IllegalRefCountExceptions if you write a buffer and re-use it. So at some level unreleasable is necessary.

I have not looked deeply at the servicetalk code but can't imagine why this couldn't work in principle. When control of a ByteBuf passes to the user then it's wrapped in a ST buffer and so that ownership stake will remain indefinitely. If a ByteBuf is extracted from a ST buffer passed from the user, you can just call retain() on it at that point - effectively preserving their stake rather than giving it up. Alternatively you could wrap the ByteBuf in a netty unreleasable buf at the same time that it's wrapped in an ST buf, or even do this lazily if/when it's ever converted back to a ByteBuf (safe publication wouldn't be a problem for this due to final field guarantees). Or is there something I got wrong about the servicetalk mechanics ... do users ever deal with netty ByteBufs directly?

IIUC the AT should see the same contents in memory either way, can you clarify if I’m missing something?

The indices are fine, it's that the newly allocated/copied mem (e.g. byte array for heap buffer) is not safely published - specifically AT might see the buffer's updated ref to the new array before it sees that array's newly copied contents.

apple/servicetalk#893 by itself doesn't fix this, and wouldn't be necessary at all if one of the other approaches was taken.

Really sorry for the pushback I just still can't see how this is a positive change on the netty side, especially given that there appear to be simpler ways to achieve the desired outcome outside of netty. And even more sorry if there's something I still have missed or misunderstood! FWIW I do like the included minor simplifications to the cumulators' logic and the added use of calculateNewCapacity in expandCumulation.

@njhill
Copy link
Member

njhill commented Dec 11, 2019

@Scottmitch I think independently of this it would be universally beneficial and safe (including to servicetalk usecase) to relax the cumulator check a bit to allow appending if it won't trigger a realloc. WDYT?

int required = in.readableBytes();
if (required > cumulation.maxWritableBytes()
    || (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1)
    || cumulation.isReadOnly()) {
       // expand here

I guess the only danger is if there is some decoder out there that retains a duplicate rather than a slice, and then proceeds to use the writable space for other purposes. Hard to think that this would be a real-life case though.

@Scottmitch
Copy link
Member Author

Which optimization where? If this was done then the need for any change on the netty side (e.g. this PR) goes away

I'm referring to "reclaim in space" behavior (existing Netty behavior) which is an optimization (assuming the buffer is known to be shared) over the "allocate and copy" approach (proposed in this PR).

I have not looked deeply at the servicetalk code but can't imagine why this couldn't work in principle.

What I meant to highlight here is because we don't expose reference counting to the users we cannot allow the reference count to get to 0 and the underlying memory be released until the memory is no longer referenced. The initial suggestion of "increment the ref cnt when passing a buffer to users" and “call super.retain() in the constructor” may lead to this result (e.g. if a user writes the same underlying buffer and retains a reference for use later).

An alternative approach to using unreleasable buffers, as you eluded to, is incrementing the reference count every time a buffer flows from Netty->ST[->User] and then also from [User->]ST->Netty (e.g. making sure all cases which may decrement refcnt have a corresponding increment such that the count never reaches 0). Both ways is requires as if the ref cnt isn’t incrementing when going from Netty->ST then we would run into similar issue this PR is attempting to address (e.g. optimizations which try to infer if a buffer is shared based upon ref count break).

There are pros/cons to each approach. For example “unreleasable buffers” reduces the surface area and removes reference counting from the equation, but non-memory reclamation based uses of the ref cnt API (e.g. is the buffer shared) may uncovered less-exercised code paths (e.g. this discussion). The “manage ref counts on entry/exit points” uses refcounts in a more "traditional" fashion, but has a larger surface area and cases maybe missed as code evolves.

Independent of this PR and discussion, in Netty it is possible to generate “unreleasable buffers” from an ByteBufAllocator (or generate/wrap in a ChannelHandler), and Netty’s own UnreleasableByteBuf will not work with ByteToMessageDecoder. Here are the options I can think of:

  • Change UnreleasableByteBuf to represent >1 from refCnt() (e.g. MAX_INT?). The “refCnt()” API becomes strange, but it is already somewhat abused due to incrementing/decrementing being disconnected. This may have other implications and more investigation is required. It is also subtle in terms if others have their own forms of unreleasable buffers (e.g. how is refCnt expected to behave is somewhat ambiguous as you can see it is 1, but this won’t even work everywhere in Netty today).
  • Change ByteToMessageDecoder as suggested in this PR (as is the MERGE_CUMULATOR may still cause issues if ByteBuf is accessed from different threads if a resize operation occurs). Stop using refCnt to infer if a buffer is shared, lose the “reclaim memory in place” optimization, and substitute a “allocate and copy” approach.
  • Netty’s current behavior doesn’t change (e.g. UnreleasableByteBuf still doesn’t work by default with ByteBufAllocator) but add a new interface which extends ByteToMessageDecoder$Cumulation that has control over compaction of cumulation. Users can specialize behavior if necessary. See https://github.com/Scottmitch/netty/tree/byte_to_message_cumulator
  • Declare that UnreleasableByteBuf doesn’t work with ByteToMessageDecoder (which may extend to it is not safe to generate one from a ByteBufAllocator) Although unlikely to occur (e.g. directly from ByteBufAllocator, or potentially through ChannelHandler manipulation) the option seems undesirable. Issues related to this surface infrequently and can be difficult to track down the root cause (e.g. why is my content different). ByteToMessageDecoder also accommodates for ReadOnly buffers, so it seems strange to no be compatible with the current UnreleasableByteBuf.

specifically AT might see the buffer's updated ref to the new array before it sees that array's newly copied contents.

Thanks for clarifying! Yes, this is problematic.

Really sorry for the pushback...

No need to apologize, the discussion is helpful/welcome!

relax the cumulator check a bit to allow appending if it won't trigger a realloc … retains a duplicate rather than a slice

I guess the difference would be “ownership” if we are assuming “refCnt() > 1” is a proxy for “this buffer is shared so ByteToMessageDecoder shouldn’t touch the data” then this maybe problematic. Folks can always supply their own Cumulator if they want this behavior so I don’t think we need to change the default behavior.

@njhill
Copy link
Member

njhill commented Dec 12, 2019

Thanks @Scottmitch, I agree this is good discussion! Stand by for another wall of text... :)

My perspective on the unreleasable buffer thing is a bit different. From a netty pov I would not consider there to be such a thing as an "unreleasable buffer". UnreleasableByteBuf is an internal class used only to implement Unpooled#unreleasableBuffer(...) which is documented as returning "an unreleasable view on the given ByteBuf which will just ignore release and retain calls". I see this just as a tool for quite specific uses where the implications of such use is understood in the context of otherwise ref-count aware logic. In particular if you are firing an unreleasable-wrapped ByteBuf into some arbitrary code that in theory might both update and/or depend on the refcnt value semantics, then this must be accounted for, e.g. here by calling retain() on the underlying buffer to pin its refcnt to > 1.

Really the precise count itself has no meaning to anyone inspecting a ByteBuf apart from whether it is 0, 1 or >1, (released, exclusive or shared). So in this sense if you were determined for the allocator to produce unreleasable bufs then having their ref counts pinned to 2 (or equivalently any other number >1) at the outset seems like the best option. This is what I was referring to with the suggestion of calling super.retain() in the servicetalk unreleasable ByteBuf impl constructors (it sounds like you may have interpreted slightly differently), but since these are internal to ST then performance-wise just overriding refCnt() might be preferable.

An alternative approach to using unreleasable buffers, as you eluded to, is incrementing the reference count every time a buffer flows from Netty->ST[->User] and then also from [User->]ST->Netty (e.g. making sure all cases which may decrement refcnt have a corresponding increment such that the count never reaches 0). Both ways is requires as if the ref cnt isn’t incrementing when going from Netty->ST then we would run into similar issue this PR is attempting to address

I actually don't think anything special is needed for the Netty->ST->User case. This is standard netty semantics - in general when you give a ByteBuf to someone then you are also transferring ownership i.e. they have the responsibility to release. It's just in this case it will be wrapped in a ST buffer and so never released, which is what we want. If anything continues to use the pre-wrapped buf on the ST/Netty side then it will have to have been retained first anyway, and so will necessarily have a refcnt >= 2.

So then all you need to be careful about is if/when a ByteBuf is extracted from a ST buffer. There are two cases - either the ByteBuf is accessed transiently (e.g. just to read/write some data), or it's kept/passed-along for some purpose. In the first case there is no problem, in the second you're essentially stealing ownership and so first need to call retain() to keep the user's (never-to-be-released) ownership stake intact.

This does not seem like much of a burden, especially as the conversion is always done via the BufferUtil class (i.e. really a tiny surface area - you could have just two extraction methods, one for each of the above cases). My understanding was that part of the design premise of ST is to encapsulate the hard stuff so that users don't have to deal with it. This is just bread and butter ByteBuf handling that any Netty application has to do, and ST is a Netty application. To say that this would be too difficult to maintain, given that this is just ST internals in question and you guys are the maintainers, seems kind of odd to me!

FWIW I did a quick search of the ST code and found that this is in fact already done in most places, with only two methods where it would need to be added (CompositeBuffer#addBuffer and BufferHandler#write, two places in the latter). However for maintainability a small refactor would be prudent imho to refrain from doing this retain() in multiple places just move it to one of the two util methods that I alluded to above.

Declare that UnreleasableByteBuf doesn’t work with ByteToMessageDecoder

Again my view is that using Unpooled#unreasableBuffer(...) to obtain a "protected" view of some ByteBuf is quite a specific tool, for use in "controlled" circumstances. To say it doesn't "work" with BTMD is kind of ambiguous imo. It depends how you are using it and what you are trying to do. It seems quite reasonable that you can't just wrap every ByteBuf like this at the source (without other consideration like making sure it's put in a "shared" state first via a call to retain()) and expect any/all existing code to work as-is.

Stop using refCnt to infer if a buffer is shared, lose the “reclaim memory in place” optimization, and substitute a “allocate and copy” approach.

I'm not in favour of this option but I still can't see why a dedicated compaction "allocate and copy" operation would ever be needed (e.g. the swapCumulation method in this PR). "Compaction" is only needed to control growth, and growth in this case involves moving to a new buffer anyhow which automatically compacts.

add a new interface which extends ByteToMessageDecoder$Cumulation that has control over compaction of cumulation.

This is actually part of what I've been iterating on for some time now (see #8931, #9843). I too think it makes complete sense for the cumulator to subsume responsibility for discarding, but don't see why a new subinterface would be needed for this - it's sufficient to only care about discarding when performing a cumulate() since this is the only time the buffer will grow (e.g. #8931 already does this, dispensing with discardReadBytes).

Finally, those referenced PRs are really precursors to a bigger optimization I have that should hopefully eliminate the need for any explicit discarding as a side effect. I had been trying to get it into at least a candidate "finished" state but maybe I'll share in a WIP state for feedback given how long it's taking.

@normanmaurer
Copy link
Member

@njhill @Scottmitch this is a very interesting discussion and I still need to make my mind up here... That said I think I agree with @Scottmitch that using refCnt() > 1 in ByteToMessageDecoder is quite fragile and error-prone. Especially taking into account that reference counts may be incremented and released from multiple threads and delegated to wrapped buffers as well.

So whatever we do at the end I think we should get rid of the refCnt() > 1check and just assume the buffer / ref count is shared.

@Scottmitch
Copy link
Member Author

Scottmitch commented Dec 12, 2019

I actually don't think anything special is needed for the Netty->ST->User case ... It's just in this case it will be wrapped in a ST buffer and so never released, which is what we want

Seems like we are talking past each other here ... "wrapping" is mixing in another approach as opposed to incrementing the reference count. I think we both understand that writing a ByteBuf transfers ownership, and if the intention is to not have Netty reclaim the memory something must be done (e.g. doing nothing is not an option).

To say that this would be too difficult to maintain

To be clear, I didn't say this. I'm just pointing out the pros/cons of the different approaches. This PR is meant to facilitate discussions and if the Netty community feels that the current ByteToMessageDecoder behavior is correct then we can adjust in ST. However based upon primary usage of ref cnt (e.g. to release memory), and current behavior of UnreleasableByteBuf it is not clear the current behavior is safe.

FWIW I did a quick search of the ST code and found that this is in fact already done in most places

To be fair I expect the way ST handles refCnt is somewhat inconsistent due to the introduction of unreleasable buffers over time. It is certainly possible but more investigation is required.

To say it doesn't "work" with BTMD is kind of ambiguous imo ... without other consideration like making sure it's put in a "shared" state first via a call to retain()

UnreleasableByteBuf has its own reference count, and it cannot be changed, so if even if the user called retain() on the inner buffer this wouldn't change the current behavior with BTMD. Requiring a retain() of the inner buffer to use UnreleasableByteBuf (which would then require a release() later) is not part of the expected contract today. Discussed with @njhill offline and I was mistaken here (was looking at ST 🤦‍♂), Netty's is wrapped and delegates.

"Compaction" is only needed to control growth, and growth in this case involves moving to a new buffer anyhow which automatically compacts.

Agreed, the buffer size doesn't need to be increased during compaction.

too think it makes complete sense for the cumulator to subsume responsibility for discarding, .. . but don't see why a new subinterface would be needed for this ... #8931 already does this, dispensing with discardReadBytes

If there are other options that is great! I don't think #8931 will work "as is" bcz discardReadBytes is still called in channelReadComplete (by ByteToMessageDecoder, SslHandler, H2ConnectionHandler) so the Cumulator doesn’t have complete control. If we can get rid of this usage and have it be completely controlled by the Cumulator that should be sufficient.

Remaining Discussion (lets re-focus):

  • Is using refCnt() > 1 a pattern we want to rely upon in Netty to infer "this buffer maybe shared" (is this used outside of BTMD?). I’m not necessarily against this, and if used in a controlled fashion it can be effective to reduce allocations, but also doesn’t work for UnreleasableByteBuf. ST currently uses a similar approach as UnreleasableByteBuf, which is what got us here, but can be updated either way to reflect what is viewed as “correct” usage so lets not focus on the ST details in this PR.
  • (followup) Can the accumulation and compaction strategy used by BTMD be made extensible. Currently only the accumulation strategy is, and the logic is split between Cumulator and BTMD (so difficult/impossible to override with new H2 APIs). Independent of the refCnt() discussion, it would be useful to be able to have a strategy that uses stable buffers (e.g. avoid growing buffers, avoid moving data around).

@normanmaurer
Copy link
Member

Remaining Discussion (lets re-focus):

Is using refCnt() > 1 a pattern we want to rely upon in Netty to infer "this buffer maybe shared" (is this used outside of BTMD?). I’m not necessarily against this, and if used in a controlled fashion it can be effective to reduce allocations, but also doesn’t work for UnreleasableByteBuf. ST currently uses a similar approach as UnreleasableByteBuf, which is what got us here, but can be updated either way to reflect what is viewed as “correct” usage so lets not focus on the ST details in this PR.

After thinking a bit more about this I would prefer to not depend on this as it is error-prone, especially if retain / release calls are done in different threads.

(followup) Can the accumulation and compaction strategy used by BTMD be made extensible. Currently only the accumulation strategy is, and the logic is split between Cumulator and BTMD (so difficult/impossible to override with new H2 APIs). Independent of the refCnt() discussion, it would be useful to be able to have a strategy that uses stable buffers (e.g. avoid growing buffers, avoid moving data around).

Are we talking about 4.x or next major here ?

@Scottmitch
Copy link
Member Author

Discussed offline with @njhill. In summary...

  • Short term: close this PR in favor of ByteToMessageDecoder Cumulator improments #9877 (ST will adjust reference count strategy, no need for Netty to remove refCnt() > 1 checks and compaction in place code).
  • Medium/Longer term: move away from Cumulation and propose a more ergonomic approach to accumulation designed to reduce constant "copy into accumulation buffer" operations (assuming there is data left in the accumulation buffer after decode).

thanks for review/discussion @njhill / @normanmaurer !

@Scottmitch Scottmitch closed this Dec 13, 2019
@Scottmitch Scottmitch deleted the byte_to_message_discard branch December 13, 2019 06:00
@normanmaurer normanmaurer removed this from the 4.1.44.Final milestone Dec 18, 2019
ihanyong pushed a commit to ihanyong/netty that referenced this pull request Jul 31, 2020
…DecoderTest` (netty#9851)

Motivation:

We did not correctly close the `EmbeddedChannel` which would lead to not have `handlerRemoved(...)` called. This can lead to leaks. Beside this we also did not correctly consume produced data which could also show up as a leak.

Modifications:

- Always call `EmbeddedChannel.finish()`
- Ensure we consume all produced data and release it

Result:

No more leaks in test. This showed up in netty#9850 (comment).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants