From ac012eba189ae58b4f6914b7494c8e96f5c63e02 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 21 Aug 2018 11:13:52 +0300 Subject: [PATCH 1/2] fix: assumption about all payloads being processed synchronously is wrong --- .../java/io/rsocket/util/ByteBufPayload.java | 7 +- .../java/io/rsocket/util/DefaultPayload.java | 8 +- .../integration/InteractionsLoadTest.java | 106 ++++++++++++++++++ 3 files changed, 117 insertions(+), 4 deletions(-) create mode 100644 rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java diff --git a/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java b/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java index d63454ba9..c1d9b0b9e 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java @@ -23,10 +23,11 @@ import io.netty.util.AbstractReferenceCounted; import io.netty.util.Recycler; import io.rsocket.Payload; + +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; -import javax.annotation.Nullable; public final class ByteBufPayload extends AbstractReferenceCounted implements Payload { private static final Recycler RECYCLER = @@ -168,8 +169,8 @@ public static Payload create(ByteBuf data) { public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) { ByteBufPayload payload = RECYCLER.get(); payload.setRefCnt(1); - payload.data = data; - payload.metadata = metadata; + payload.data = data.retain(); + payload.metadata = metadata == null ? Unpooled.EMPTY_BUFFER : metadata.retain(); return payload; } diff --git a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java index c53617e7c..8804e9438 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java @@ -155,6 +155,12 @@ public static Payload create(ByteBuffer data, @Nullable ByteBuffer metadata) { } public static Payload create(Payload payload) { - return create(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null); + return create(copy(payload.sliceData()), payload.hasMetadata() ? copy(payload.sliceMetadata()) : null); + } + + private static ByteBuffer copy(ByteBuf byteBuf) { + byte[] contents = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(contents); + return ByteBuffer.wrap(contents); } } diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java new file mode 100644 index 000000000..4996b8f88 --- /dev/null +++ b/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java @@ -0,0 +1,106 @@ +package io.rsocket.integration; + +import io.rsocket.AbstractRSocket; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.NettyContextCloseable; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.time.Duration; +import java.util.function.Supplier; + +public class InteractionsLoadTest { + + public static void main(String[] args) { + TcpServerTransport serverTransport = TcpServerTransport.create(0); + + NettyContextCloseable server = RSocketFactory.receive() + // .frameDecoder(ByteBufPayload::create) + .acceptor((setup, rsocket) -> Mono.just(new EchoRSocket())) + .transport(serverTransport) + .start() + .block(Duration.ofSeconds(10)); + + TcpClientTransport transport = TcpClientTransport.create(server.address()); + + RSocket client = RSocketFactory + .connect() + //.frameDecoder(ByteBufPayload::create) + .transport(transport).start() + .block(Duration.ofSeconds(10)); + + int concurrency = 16; + Flux.range(1, concurrency) + .flatMap(v -> + client.requestChannel( + input().map(iv -> + DefaultPayload.create("foo"))) + .limitRate(10000),concurrency) + .doOnNext(p -> { + String data = p.getDataUtf8(); + if (!data.equals("bar")) { + System.out.println("Channel Client Bad message: " + data); + } + //p.release(); + }) + .window(Duration.ofSeconds(1)) + .flatMap(Flux::count) + .doOnNext(d -> System.out.println("Got: " + d)) + .timeout(Duration.ofSeconds(525)) + .take(Duration.ofMinutes(2)) + .doOnTerminate(server::dispose) + .subscribe(); + + server.onClose().block(); + + } + + private static Flux input() { + Flux interval = Flux.interval(Duration.ofMillis(1)) + .onBackpressureDrop(); + for (int i = 0; i < 10; i++) { + interval = interval.mergeWith(interval); + } + return interval.publishOn(Schedulers.newSingle("bar")); + } + + private static class EchoRSocket extends AbstractRSocket { + @Override + public Flux requestChannel(Publisher payloads) { + return Flux.from(payloads).map(p -> { + + String data = p.getDataUtf8(); + if (!data.equals("foo")) { + System.out.println("Channel Server Bad message: " + data); + } + // p.release(); + return DefaultPayload.create(DefaultPayload.create("bar")); + }); + } + + @Override + public Flux requestStream(Payload payload) { + return Flux.just(payload) + .map(p -> { + String data = p.getDataUtf8(); + // p.release(); + return data; + }) + .doOnNext((data) -> { + if (!data.equals("foo")) { + System.out.println("Stream Server Bad message: " + data); + } + }).flatMap(data -> { + Supplier p = () -> DefaultPayload.create("bar"); + return Flux.range(1, 100).map(v -> p.get()); + }); + } + } +} From 5068d8bf568c17d1698639e805dc2b65e46cdafa Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 21 Aug 2018 18:44:32 +0300 Subject: [PATCH 2/2] convert to test --- .../integration/InteractionsLoadTest.java | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java index 4996b8f88..c43cefed5 100644 --- a/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java +++ b/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java @@ -4,25 +4,27 @@ import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; +import io.rsocket.test.SlowTest; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.server.NettyContextCloseable; import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.util.DefaultPayload; +import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import java.time.Duration; import java.util.function.Supplier; public class InteractionsLoadTest { - public static void main(String[] args) { + @Test + @SlowTest + public void channel() { TcpServerTransport serverTransport = TcpServerTransport.create(0); NettyContextCloseable server = RSocketFactory.receive() - // .frameDecoder(ByteBufPayload::create) .acceptor((setup, rsocket) -> Mono.just(new EchoRSocket())) .transport(serverTransport) .start() @@ -32,7 +34,6 @@ public static void main(String[] args) { RSocket client = RSocketFactory .connect() - //.frameDecoder(ByteBufPayload::create) .transport(transport).start() .block(Duration.ofSeconds(10)); @@ -40,21 +41,20 @@ public static void main(String[] args) { Flux.range(1, concurrency) .flatMap(v -> client.requestChannel( - input().map(iv -> - DefaultPayload.create("foo"))) - .limitRate(10000),concurrency) + input().onBackpressureDrop().map(iv -> + DefaultPayload.create("foo"))) + .limitRate(10000), concurrency) + .timeout(Duration.ofSeconds(5)) .doOnNext(p -> { String data = p.getDataUtf8(); if (!data.equals("bar")) { - System.out.println("Channel Client Bad message: " + data); + throw new IllegalStateException("Channel Client Bad message: " + data); } - //p.release(); }) .window(Duration.ofSeconds(1)) .flatMap(Flux::count) .doOnNext(d -> System.out.println("Got: " + d)) - .timeout(Duration.ofSeconds(525)) - .take(Duration.ofMinutes(2)) + .take(Duration.ofMinutes(1)) .doOnTerminate(server::dispose) .subscribe(); @@ -68,7 +68,7 @@ private static Flux input() { for (int i = 0; i < 10; i++) { interval = interval.mergeWith(interval); } - return interval.publishOn(Schedulers.newSingle("bar")); + return interval; } private static class EchoRSocket extends AbstractRSocket { @@ -78,9 +78,8 @@ public Flux requestChannel(Publisher payloads) { String data = p.getDataUtf8(); if (!data.equals("foo")) { - System.out.println("Channel Server Bad message: " + data); + throw new IllegalStateException("Channel Server Bad message: " + data); } - // p.release(); return DefaultPayload.create(DefaultPayload.create("bar")); }); } @@ -90,12 +89,11 @@ public Flux requestStream(Payload payload) { return Flux.just(payload) .map(p -> { String data = p.getDataUtf8(); - // p.release(); return data; }) .doOnNext((data) -> { if (!data.equals("foo")) { - System.out.println("Stream Server Bad message: " + data); + throw new IllegalStateException("Stream Server Bad message: " + data); } }).flatMap(data -> { Supplier p = () -> DefaultPayload.create("bar");