-
Notifications
You must be signed in to change notification settings - Fork 357
add fragmentation support using new frames #585
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Robert Roeser <rroeserr@gmail.com>
| } catch (Throwable t) { | ||
| if (byteBuf != null) { | ||
| ReferenceCountUtil.safeRelease(byteBuf); | ||
| ReferenceCountUtil.safeRelease(payload); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this release as well as line 362? I can see this repeated below also, so assuming deliberate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can probably remove the payload one. I might have added it when I was chasing done some memory leaks.
| Objects.requireNonNull(delegate, "delegate must not be null"); | ||
| Objects.requireNonNull(allocator, "byteBufAllocator must not be null"); | ||
| if (mtu < MIN_MTU_SIZE) { | ||
| throw new IllegalArgumentException("smallest allowed mtu size is 64 bytes"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for a non hot path exception, worth referring to MIN_MTU_SIZE in the message instead of 64
| }) | ||
| .subscribe(); | ||
| private boolean shouldFragment(FrameType frameType, int readableBytes) { | ||
| if (frameType.isFragmentable()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tioli: more readable as a one liner
return frameType.isFragmentable() && readableBytes > mtu
Signed-off-by: Robert Roeser <rroeserr@gmail.com>
| return PayloadFrameFlyweight.encode( | ||
| allocator, streamId, true, true, false, metadataFragment, dataFragment); | ||
| default: | ||
| throw new IllegalStateException("unsupported fragment type"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: useful to include frameType in the message
| } | ||
| } | ||
| boolean follows = | ||
| metadata == Unpooled.EMPTY_BUFFER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why check this? doesn't metadata.isReadable() do this?
| metadata = PayloadFrameFlyweight.metadata(frame); | ||
| break; | ||
| default: | ||
| throw new IllegalStateException("unsupported fragment type"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above include frameType
| * and Reassembly</a> | ||
| */ | ||
| final class FrameReassembler implements Disposable { | ||
| final class FrameReassembler extends AtomicBoolean implements Disposable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why extend AtomicBoolean instead of a field? memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something I picked up from rxjava
| ? metadata.retain() | ||
| : Unpooled.wrappedBuffer(this.metadata, metadata.retain()); | ||
| } catch (Throwable t) { | ||
| logger.error("error reassemble frame", t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| logger.error("error reassemble frame", t); | |
| logger.error("error reassembling frame", t); |
| return this.metadata == null | ||
| ? metadata.retain() | ||
| : Unpooled.wrappedBuffer(this.metadata, metadata.retain()); | ||
| } catch (Throwable t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very generic error to handle here when there doesn't seem to be a handoff to client code, i.e. it's either an encoding problem on the wire or a logic bug in this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've gotten in the habit of doing this with publishers to maker sure exceptions don't slip by - its less about handling the error and more about making sure that I call sink.error less the exception disappears in the ether.
| public static String toString(ByteBuf frame) { | ||
| FrameType frameType = FrameHeaderFlyweight.frameType(frame); | ||
| int streamId = FrameHeaderFlyweight.streamId(frame); | ||
| StringBuilder payload = new StringBuilder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worth presizing? 16 default will require a realloc. Can be estimated pretty definitively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe? this was kind of for debugging purposes, but I can add a size
| long requestN, | ||
| Payload payload) { | ||
|
|
||
| int reqN = requestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requestN; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest asserting on negative values as a public util method
| int flags = 0; | ||
|
|
||
| if (metadata != null) { | ||
| if (metadata != null && Unpooled.EMPTY_BUFFER != metadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't EMPTY_BUFFER the "" case? and null is the case where flag should be unset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I that is correct - that would explain the other thing I changed in the unit test - it was kind of early - I will go through and check the flyweights.
|
|
||
| @Override | ||
| public Mono<Void> send(Publisher<ByteBuf> frame) { | ||
| if (debugEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
being pedantic, shouldn't debug levels be able to change in a running app?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I set this way so that eventually it would get JIT'd away and you could only set it startup so you don't accidentally turn on frame logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your call, it's not common.
|
|
||
| @Override | ||
| public ByteBuf sliceMetadata() { | ||
| return metadata == null ? Unpooled.EMPTY_BUFFER : metadata.slice(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether this implicit conversion from null to EMPTY_BUFFER could be confusing for the null/"" edge case
|
|
||
| private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; | ||
|
|
||
| @DisplayName("fragments and reassmbles data") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| @DisplayName("fragments and reassmbles data") | |
| @DisplayName("fragments and reassembles data") |
| @DisplayName("fragments and reassmbles data") | ||
| @Test | ||
| void fragmentAndReassembleData() { | ||
| System.out.println("original"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete?
| frame = FrameLengthFlyweight.encode(ByteBufAllocator.DEFAULT, frame.readableBytes(), frame); | ||
|
|
||
| assertEquals("00000e0000000119000000000100000064", ByteBufUtil.hexDump(frame)); | ||
| assertEquals("00000b0000000118000000000164", ByteBufUtil.hexDump(frame)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this a bug?
| * Creates a new instance | ||
| * | ||
| * @param encodeLength indicates if this connection should encode the length or not. | ||
| * @param connection the {@link Connection} to for managing the server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"to for"?
yschimke
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| return; | ||
| } | ||
|
|
||
| boolean hasFollows = FrameHeaderFlyweight.hasFollows(frame); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: there is a mention in a spec that follows flag should be ignored if complete flag is set
| CompositeByteBuf cd = removeData(streamId); | ||
| if (cd != null) { | ||
| ByteBuf d = PayloadFrameFlyweight.data(frame); | ||
| if (d != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: data is not nullable
| case NEXT_COMPLETE: | ||
| return PayloadFrameFlyweight.encode( | ||
| allocator, streamId, true, true, true, metadataFragment, dataFragment); | ||
| case COMPLETE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this possible for first fragment? COMPLETE is synthetic type for frame not having Payload (data/metadata) - unlikely to be fragmented
| allocator, streamId, true, false, true, metadataFragment, dataFragment); | ||
| case NEXT_COMPLETE: | ||
| return PayloadFrameFlyweight.encode( | ||
| allocator, streamId, true, true, true, metadataFragment, dataFragment); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: spec mentions follows flag should be ignored if complete flag is set - wondering if complete flag should be deferred to last fragment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure - the spec is a little unclear around fragmentation. If we send it last we can't generate the header until the last frame, and the metadata is supposed to come complete before the data - so how do you read it? Also I don't sent next_complete frames in the fragments i send next frames. Maybe we should change the spec because it seems more complicated to defer this till the end.
| // Payload and synthetic types | ||
| case PAYLOAD: | ||
| return PayloadFrameFlyweight.encode( | ||
| allocator, streamId, true, false, false, metadataFragment, dataFragment); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next should be set - cant have both next and complete false
Signed-off-by: Robert Roeser <rroeserr@gmail.com>
* add fragmentation support using new frames Signed-off-by: Robert Roeser <rroeserr@gmail.com> Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com>
The flags were introduced in rsocket#585 such that when fragmentation is enabled, it takes over the encoding and decoding of the frame length instead of the underlying transport. However this doesn't seem to have any obvious purpose. On the sending side, fragmentation breaks down the frames and then the transport can add the length just as well. On the receiving side, the transport is first anyway. It's better to simplify this and leave it to transports to decide this whether a length is needed and expected.
The flags were introduced in rsocket#585 such that when fragmentation is enabled, it takes over the encoding and decoding of the frame length instead of the underlying transport. However this doesn't seem to have any obvious purpose. On the sending side, fragmentation breaks down the frames and then the transport can add the length just as well. On the receiving side, the transport is first anyway. It's better to simplify this and leave it to transports to decide this whether a length is needed and expected. Signed-off-by: Rossen Stoyanchev <rstoyanchev@pivotal.io>
The flags were introduced in rsocket#585 such that when fragmentation is enabled, it takes over the encoding and decoding of the frame length instead of the underlying transport. However this doesn't seem to have any obvious purpose. On the sending side, fragmentation breaks down the frames and then the transport can add the length just as well. On the receiving side, the transport is first anyway. It's better to simplify this and leave it to transports to decide this whether a length is needed and expected. Signed-off-by: Rossen Stoyanchev <rstoyanchev@pivotal.io>
Signed-off-by: Robert Roeser rroeserr@gmail.com