Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.Recycler;
import io.rsocket.Payload;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import javax.annotation.Nullable;

public final class ByteBufPayload extends AbstractReferenceCounted implements Payload {
private static final Recycler<ByteBufPayload> RECYCLER =
Expand Down Expand Up @@ -168,8 +169,8 @@ public static Payload create(ByteBuf data) {
public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) {
ByteBufPayload payload = RECYCLER.get();
payload.setRefCnt(1);
payload.data = data;
payload.metadata = metadata;
payload.data = data.retain();
payload.metadata = metadata == null ? Unpooled.EMPTY_BUFFER : metadata.retain();
return payload;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ public static Payload create(ByteBuffer data, @Nullable ByteBuffer metadata) {
}

public static Payload create(Payload payload) {
return create(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null);
return create(copy(payload.sliceData()), payload.hasMetadata() ? copy(payload.sliceMetadata()) : null);
}

private static ByteBuffer copy(ByteBuf byteBuf) {
byte[] contents = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(contents);
return ByteBuffer.wrap(contents);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package io.rsocket.integration;

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.test.SlowTest;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.NettyContextCloseable;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.function.Supplier;

public class InteractionsLoadTest {

@Test
@SlowTest
public void channel() {
TcpServerTransport serverTransport = TcpServerTransport.create(0);

NettyContextCloseable server = RSocketFactory.receive()
.acceptor((setup, rsocket) -> Mono.just(new EchoRSocket()))
.transport(serverTransport)
.start()
.block(Duration.ofSeconds(10));

TcpClientTransport transport = TcpClientTransport.create(server.address());

RSocket client = RSocketFactory
.connect()
.transport(transport).start()
.block(Duration.ofSeconds(10));

int concurrency = 16;
Flux.range(1, concurrency)
.flatMap(v ->
client.requestChannel(
input().onBackpressureDrop().map(iv ->
DefaultPayload.create("foo")))
.limitRate(10000), concurrency)
.timeout(Duration.ofSeconds(5))
.doOnNext(p -> {
String data = p.getDataUtf8();
if (!data.equals("bar")) {
throw new IllegalStateException("Channel Client Bad message: " + data);
}
})
.window(Duration.ofSeconds(1))
.flatMap(Flux::count)
.doOnNext(d -> System.out.println("Got: " + d))
.take(Duration.ofMinutes(1))
.doOnTerminate(server::dispose)
.subscribe();

server.onClose().block();

}

private static Flux<Long> input() {
Flux<Long> interval = Flux.interval(Duration.ofMillis(1))
.onBackpressureDrop();
for (int i = 0; i < 10; i++) {
interval = interval.mergeWith(interval);
}
return interval;
}

private static class EchoRSocket extends AbstractRSocket {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads).map(p -> {

String data = p.getDataUtf8();
if (!data.equals("foo")) {
throw new IllegalStateException("Channel Server Bad message: " + data);
}
return DefaultPayload.create(DefaultPayload.create("bar"));
});
}

@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.just(payload)
.map(p -> {
String data = p.getDataUtf8();
return data;
})
.doOnNext((data) -> {
if (!data.equals("foo")) {
throw new IllegalStateException("Stream Server Bad message: " + data);
}
}).flatMap(data -> {
Supplier<Payload> p = () -> DefaultPayload.create("bar");
return Flux.range(1, 100).map(v -> p.get());
});
}
}
}