diff --git a/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java b/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java index d63454ba9..c1d9b0b9e 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java @@ -23,10 +23,11 @@ import io.netty.util.AbstractReferenceCounted; import io.netty.util.Recycler; import io.rsocket.Payload; + +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; -import javax.annotation.Nullable; public final class ByteBufPayload extends AbstractReferenceCounted implements Payload { private static final Recycler RECYCLER = @@ -168,8 +169,8 @@ public static Payload create(ByteBuf data) { public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) { ByteBufPayload payload = RECYCLER.get(); payload.setRefCnt(1); - payload.data = data; - payload.metadata = metadata; + payload.data = data.retain(); + payload.metadata = metadata == null ? Unpooled.EMPTY_BUFFER : metadata.retain(); return payload; } diff --git a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java index c53617e7c..8804e9438 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java @@ -155,6 +155,12 @@ public static Payload create(ByteBuffer data, @Nullable ByteBuffer metadata) { } public static Payload create(Payload payload) { - return create(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null); + return create(copy(payload.sliceData()), payload.hasMetadata() ? copy(payload.sliceMetadata()) : null); + } + + private static ByteBuffer copy(ByteBuf byteBuf) { + byte[] contents = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(contents); + return ByteBuffer.wrap(contents); } } diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java new file mode 100644 index 000000000..c43cefed5 --- /dev/null +++ b/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java @@ -0,0 +1,104 @@ +package io.rsocket.integration; + +import io.rsocket.AbstractRSocket; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.test.SlowTest; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.NettyContextCloseable; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.function.Supplier; + +public class InteractionsLoadTest { + + @Test + @SlowTest + public void channel() { + TcpServerTransport serverTransport = TcpServerTransport.create(0); + + NettyContextCloseable server = RSocketFactory.receive() + .acceptor((setup, rsocket) -> Mono.just(new EchoRSocket())) + .transport(serverTransport) + .start() + .block(Duration.ofSeconds(10)); + + TcpClientTransport transport = TcpClientTransport.create(server.address()); + + RSocket client = RSocketFactory + .connect() + .transport(transport).start() + .block(Duration.ofSeconds(10)); + + int concurrency = 16; + Flux.range(1, concurrency) + .flatMap(v -> + client.requestChannel( + input().onBackpressureDrop().map(iv -> + DefaultPayload.create("foo"))) + .limitRate(10000), concurrency) + .timeout(Duration.ofSeconds(5)) + .doOnNext(p -> { + String data = p.getDataUtf8(); + if (!data.equals("bar")) { + throw new IllegalStateException("Channel Client Bad message: " + data); + } + }) + .window(Duration.ofSeconds(1)) + .flatMap(Flux::count) + .doOnNext(d -> System.out.println("Got: " + d)) + .take(Duration.ofMinutes(1)) + .doOnTerminate(server::dispose) + .subscribe(); + + server.onClose().block(); + + } + + private static Flux input() { + Flux interval = Flux.interval(Duration.ofMillis(1)) + .onBackpressureDrop(); + for (int i = 0; i < 10; i++) { + interval = interval.mergeWith(interval); + } + return interval; + } + + private static class EchoRSocket extends AbstractRSocket { + @Override + public Flux requestChannel(Publisher payloads) { + return Flux.from(payloads).map(p -> { + + String data = p.getDataUtf8(); + if (!data.equals("foo")) { + throw new IllegalStateException("Channel Server Bad message: " + data); + } + return DefaultPayload.create(DefaultPayload.create("bar")); + }); + } + + @Override + public Flux requestStream(Payload payload) { + return Flux.just(payload) + .map(p -> { + String data = p.getDataUtf8(); + return data; + }) + .doOnNext((data) -> { + if (!data.equals("foo")) { + throw new IllegalStateException("Stream Server Bad message: " + data); + } + }).flatMap(data -> { + Supplier p = () -> DefaultPayload.create("bar"); + return Flux.range(1, 100).map(v -> p.get()); + }); + } + } +}