Skip to content

Commit

Permalink
enforces reassembly on the receiver side
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed Apr 9, 2020
1 parent 8bdaee2 commit 7eb4882
Show file tree
Hide file tree
Showing 12 changed files with 424 additions and 255 deletions.
Expand Up @@ -34,7 +34,7 @@
import reactor.core.publisher.Mono;

/**
* A {@link DuplexConnection} implementation that fragments and reassembles {@link ByteBuf}s.
* A {@link DuplexConnection} implementation that fragments {@link ByteBuf}s.
*
* @see <a
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#fragmentation-and-reassembly">Fragmentation
Expand Down Expand Up @@ -138,23 +138,9 @@ private ByteBuf encode(ByteBuf frame) {
}
}

private ByteBuf decode(ByteBuf frame) {
if (encodeLength) {
return FrameLengthFlyweight.frame(frame).retain();
} else {
return frame;
}
}

@Override
public Flux<ByteBuf> receive() {
return delegate
.receive()
.handle(
(byteBuf, sink) -> {
ByteBuf decode = decode(byteBuf);
frameReassembler.reassembleFrame(decode, sink);
});
return delegate.receive();
}

@Override
Expand Down
@@ -0,0 +1,89 @@
/*
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.fragmentation;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.FrameLengthFlyweight;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* A {@link DuplexConnection} implementation that reassembles {@link ByteBuf}s.
*
* @see <a
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#fragmentation-and-reassembly">Fragmentation
* and Reassembly</a>
*/
public final class ReassemblyDuplexConnection implements DuplexConnection {
private final DuplexConnection delegate;
private final FrameReassembler frameReassembler;
private final boolean decodeLength;

public ReassemblyDuplexConnection(
DuplexConnection delegate, ByteBufAllocator allocator, boolean decodeLength) {
Objects.requireNonNull(delegate, "delegate must not be null");
Objects.requireNonNull(allocator, "byteBufAllocator must not be null");
this.decodeLength = decodeLength;
this.delegate = delegate;
this.frameReassembler = new FrameReassembler(allocator);

delegate.onClose().doFinally(s -> frameReassembler.dispose()).subscribe();
}

@Override
public Mono<Void> send(Publisher<ByteBuf> frames) {
return delegate.send(frames);
}

@Override
public Mono<Void> sendOne(ByteBuf frame) {
return delegate.sendOne(frame);
}

private ByteBuf decode(ByteBuf frame) {
if (decodeLength) {
return FrameLengthFlyweight.frame(frame).retain();
} else {
return frame;
}
}

@Override
public Flux<ByteBuf> receive() {
return delegate
.receive()
.handle(
(byteBuf, sink) -> {
ByteBuf decode = decode(byteBuf);
frameReassembler.reassembleFrame(decode, sink);
});
}

@Override
public Mono<Void> onClose() {
return delegate.onClose();
}

@Override
public void dispose() {
delegate.dispose();
}
}
Expand Up @@ -22,13 +22,9 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.*;
import io.rsocket.util.DefaultPayload;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.Assert;
import org.junit.jupiter.api.DisplayName;
Expand Down Expand Up @@ -91,216 +87,6 @@ void constructorNullDelegate() {
.withMessage("delegate must not be null");
}

@DisplayName("reassembles data")
@Test
void reassembleData() {
List<ByteBuf> byteBufs =
Arrays.asList(
RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)),
PayloadFrameFlyweight.encode(
allocator, 1, true, false, true, DefaultPayload.create(data)),
PayloadFrameFlyweight.encode(
allocator, 1, true, false, true, DefaultPayload.create(data)),
PayloadFrameFlyweight.encode(
allocator, 1, true, false, true, DefaultPayload.create(data)),
PayloadFrameFlyweight.encode(
allocator, 1, false, false, true, DefaultPayload.create(data)));

CompositeByteBuf data =
allocator
.compositeDirectBuffer()
.addComponents(
true,
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data),
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data),
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data),
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data),
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data));

when(delegate.receive()).thenReturn(Flux.fromIterable(byteBufs));
when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
.receive()
.as(StepVerifier::create)
.assertNext(
byteBuf -> {
Assert.assertEquals(data, RequestResponseFrameFlyweight.data(byteBuf));
})
.verifyComplete();
}

@DisplayName("reassembles metadata")
@Test
void reassembleMetadata() {
List<ByteBuf> byteBufs =
Arrays.asList(
RequestResponseFrameFlyweight.encode(
allocator,
1,
true,
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
PayloadFrameFlyweight.encode(
allocator,
1,
true,
false,
true,
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
PayloadFrameFlyweight.encode(
allocator,
1,
true,
false,
true,
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
PayloadFrameFlyweight.encode(
allocator,
1,
true,
false,
true,
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
PayloadFrameFlyweight.encode(
allocator,
1,
false,
false,
true,
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))));

CompositeByteBuf metadata =
allocator
.compositeDirectBuffer()
.addComponents(
true,
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata));

when(delegate.receive()).thenReturn(Flux.fromIterable(byteBufs));
when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
.receive()
.as(StepVerifier::create)
.assertNext(
byteBuf -> {
System.out.println(byteBuf.readableBytes());
ByteBuf m = RequestResponseFrameFlyweight.metadata(byteBuf);
Assert.assertEquals(metadata, m);
})
.verifyComplete();
}

@DisplayName("reassembles metadata and data")
@Test
void reassembleMetadataAndData() {
List<ByteBuf> byteBufs =
Arrays.asList(
RequestResponseFrameFlyweight.encode(
allocator,
1,
true,
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
PayloadFrameFlyweight.encode(
allocator,
1,
true,
false,
true,
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
PayloadFrameFlyweight.encode(
allocator,
1,
true,
false,
true,
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
PayloadFrameFlyweight.encode(
allocator,
1,
true,
false,
true,
DefaultPayload.create(
Unpooled.wrappedBuffer(data), Unpooled.wrappedBuffer(metadata))),
PayloadFrameFlyweight.encode(
allocator, 1, false, false, true, DefaultPayload.create(data)));

CompositeByteBuf data =
allocator
.compositeDirectBuffer()
.addComponents(
true,
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data),
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data));

CompositeByteBuf metadata =
allocator
.compositeDirectBuffer()
.addComponents(
true,
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata));

when(delegate.receive()).thenReturn(Flux.fromIterable(byteBufs));
when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
.receive()
.as(StepVerifier::create)
.assertNext(
byteBuf -> {
Assert.assertEquals(data, RequestResponseFrameFlyweight.data(byteBuf));
Assert.assertEquals(metadata, RequestResponseFrameFlyweight.metadata(byteBuf));
})
.verifyComplete();
}

@DisplayName("does not reassemble a non-fragment frame")
@Test
void reassembleNonFragment() {
ByteBuf encode =
RequestResponseFrameFlyweight.encode(
allocator, 1, false, DefaultPayload.create(Unpooled.wrappedBuffer(data)));

when(delegate.receive()).thenReturn(Flux.just(encode));
when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
.receive()
.as(StepVerifier::create)
.assertNext(
byteBuf -> {
Assert.assertEquals(
Unpooled.wrappedBuffer(data), RequestResponseFrameFlyweight.data(byteBuf));
})
.verifyComplete();
}

@DisplayName("does not reassemble non fragmentable frame")
@Test
void reassembleNonFragmentableFrame() {
ByteBuf encode = CancelFrameFlyweight.encode(allocator, 2);

when(delegate.receive()).thenReturn(Flux.just(encode));
when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
.receive()
.as(StepVerifier::create)
.assertNext(
byteBuf -> {
Assert.assertEquals(FrameType.CANCEL, FrameHeaderFlyweight.frameType(byteBuf));
})
.verifyComplete();
}

@DisplayName("fragments data")
@Test
void sendData() {
Expand Down

0 comments on commit 7eb4882

Please sign in to comment.