New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Removes old buffer pool #16
Conversation
src/main/java/io/vlingo/wire/fdx/bidirectional/rsocket/RSocketClientChannel.java
Show resolved
Hide resolved
this.logger = logger; | ||
} | ||
|
||
private void handle(Payload payload) { | ||
final ByteBufferPool.PooledByteBuffer pooledBuffer = readBufferPool.accessFor("client-response", 25); | ||
final ConsumerByteBuffer pooledBuffer = readBufferPool.acquire(); | ||
try { | ||
final ByteBuffer payloadData = payload.getData(); | ||
final ConsumerByteBuffer put = pooledBuffer.put(payloadData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be avoided?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto above.
@@ -81,14 +83,13 @@ public void close() { | |||
} | |||
|
|||
public void consume(Payload request) { | |||
final ByteBufferPool.PooledByteBuffer pooledBuffer = readBufferPool.accessFor("client-request"); | |||
final ConsumerByteBuffer pooledBuffer = readBufferPool.acquire(); | |||
try { | |||
pooledBuffer.put(request.getData()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be avoided?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto above.
@@ -209,7 +202,7 @@ public void onSuccess() { | |||
|
|||
@Override | |||
public void onInput(final ByteBuffer decrypted) { | |||
final ConsumerByteBuffer buffer = readBufferPool.accessFor("ssl-read"); | |||
final ConsumerByteBuffer buffer = readBufferPool.acquire(); | |||
consumer.consume(buffer.put(decrypted).flip()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can put
be avoided?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that the copy is unnecessary? Otherwise, what is the problem you see and how would you do it otherwise? BTW, I did not implement the RSocket support so I am a bit out of context here. Thanks for explaining a bit more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned about the allocation overhead of copying potentially large buffers. There is a documented case here: https://github.com/buritos/vlingo-wire/blob/master/src/main/java/io/vlingo/wire/fdx/bidirectional/rsocket/RSocketClientChannel.java#L73, but I am not sure about the implications of not making a copy and also if it is the same case in all of these methods I mentioned in this PR. What I don't understand is how copying the buffer addresses "payload might not be sent immediately". I can of-course investigate, but it would be faster is someone could explain the intent. Also, how can we be sure that the original buffer fits in the buffer that we get from the pool? The way I would remove the copying is by adding a constructor in ConsumerByteBuffer
creates an instance from an existing ByteBuffer
, instead of allocating a new one. Let me know if you remember anything that would help me understand this code faster than reading it and what's your thoughts about my assumptions so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@buritos I like your idea of reusing a buffer, but I don't know if the copy-to buffer is owned by RSocket or vlingo. @alexguzun implemented the RSocket components so he can explain why this is done that way. He is in Lisbon and you can ping him on our team Slack workspace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi! The copy-to buffer ar owned by wire. They are required because unfortunatelly the rsocket and wire threads are separate meaning that a wire outbound might try to send the second message while the RSocket is still sending the first one. If we use the same buffer, the second message will overrite the first one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for explaining @alexguzun. I will spend some more time on this now that I clearly understand your motivation for copying. We might be able to get around this by pushing the acquisition of the buffers all the way up, reusing the buffers from the pool instead of holding on to particular instances. Does that make sense? Any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@buritos In general this looks like what we need. I will await your clarification on the put(...)
issues as you see them.
void activate() { | ||
active.set(true); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "PooledByteBuffer[id=" + id() + "]"; | ||
} | ||
|
||
@Override | ||
public void release() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@buritos Nice and simple.
src/main/java/io/vlingo/wire/fdx/bidirectional/rsocket/RSocketClientChannel.java
Show resolved
Hide resolved
@@ -209,7 +202,7 @@ public void onSuccess() { | |||
|
|||
@Override | |||
public void onInput(final ByteBuffer decrypted) { | |||
final ConsumerByteBuffer buffer = readBufferPool.accessFor("ssl-read"); | |||
final ConsumerByteBuffer buffer = readBufferPool.acquire(); | |||
consumer.consume(buffer.put(decrypted).flip()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that the copy is unnecessary? Otherwise, what is the problem you see and how would you do it otherwise? BTW, I did not implement the RSocket support so I am a bit out of context here. Thanks for explaining a bit more.
@@ -81,14 +83,13 @@ public void close() { | |||
} | |||
|
|||
public void consume(Payload request) { | |||
final ByteBufferPool.PooledByteBuffer pooledBuffer = readBufferPool.accessFor("client-request"); | |||
final ConsumerByteBuffer pooledBuffer = readBufferPool.acquire(); | |||
try { | |||
pooledBuffer.put(request.getData()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto above.
this.logger = logger; | ||
} | ||
|
||
private void handle(Payload payload) { | ||
final ByteBufferPool.PooledByteBuffer pooledBuffer = readBufferPool.accessFor("client-response", 25); | ||
final ConsumerByteBuffer pooledBuffer = readBufferPool.acquire(); | ||
try { | ||
final ByteBuffer payloadData = payload.getData(); | ||
final ConsumerByteBuffer put = pooledBuffer.put(payloadData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto above.
Turns out that we are hitting the bug that was discovered: https://travis-ci.org/vlingo/vlingo-wire/builds/635019329#L2492 So happy we found this! Thanks for assigning the task @VaughnVernon ❤️ |
This reverts commit 62570e8.
@VaughnVernon, my theory is that all the copying can be eliminated with some rearrangement of the code that performs the release. My gut feeling is that the code is structured in this way (eager to release) because of the limitations of the previous pool implementation (failure to meet demand). Given the scalable nature of the new implementation (we pay the price for it), it is ok to hold on to a buffer instance for longer, meaning that it is ok to release it at the final consumer, as long as the producers do not attempt to reuse it in any way (they should always acquire another buffer from the pool). @alexguzun confirms (although a bit concerned about the rippling effects of changing the API for Let me know if you think that the performance gains from such change are worth it to prioritize the changes now, or something that you would like to see in the future, or not at all :) |
With the reverted test enablement is the bug is not fix, or you are working on this further? |
Also, we definitely need to fix this. The copying is *expensive * and in this wire API is used in multiple components, or could be. I think the current changes and bug fixes you have made and that we benefit from immediately should be merged today. The tuning should be entered as an issue and fixed soon, but not immediately. |
I would like to try the tests again after eliminating |
I've made an issue. You are welcome to merge this in its current state. I haven't checked upstream for buffer release in all the places I removed it from here. However, it is low risk due to the nature of the new pool (allocates as necessary). The only issue is that leaks interfere with scaling down the pool after bursts. |
Merge done. Could you elaborate why/how this happens? |
The InUse counter is not decremented, tricking the pool to think that there
is more demand than there actually is (leaked buffers are garbage
collected).
|
Got it, thanks. |
ByteBufferPool
;ConsumerByteBuffer
more than once.PoolAwareConsumerByteBuffer::release
is made idempotent and logs warning when it is called on the same instance more than once;ConsumerByteBuffer
when it is handed over to an async operation. The buffer is released on caught exceptions but not in afinally
block, since that would most likely run in parallel with the async operation.Possible issues with merging this PR for upstream:
ConsumerByteBuffer
leaks from the pool due to not being properly released in async operations implemented upstream. This is not too big of a deal since the new pool is unbounded and therefore leaked buffers will not cause an availability issue. However, leaking buffers interferes with the pool's ability to scale down and failure to release buffers will eventually cause memory issues (large footprint, more frequent GC). See inline comments for places where we should investigate.