diff --git a/rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java b/rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java index 1c8ff726b..a7e76b8f0 100644 --- a/rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java +++ b/rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java @@ -70,7 +70,7 @@ public void requestStreamHello1000(Input input) { @Benchmark public void fireAndForgetHello(Input input) { // this is synchronous so we don't need to use a CountdownLatch to wait - input.client.fireAndForget(Input.HELLO_PAYLOAD).subscribe(input.blackHoleSubscriber); + input.client.fireAndForget(Input.HELLO_PAYLOAD).subscribe(input.voidSubscriber); } @State(Scope.Benchmark) @@ -156,37 +156,42 @@ public Mono onClose() { return Mono.just(closeable); }); - Subscriber blackHoleSubscriber; + Subscriber blackHoleSubscriber; + Subscriber voidSubscriber; RSocket client; @Setup public void setup(Blackhole bh) { - blackHoleSubscriber = - new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Object o) { - bh.consume(o); - } - - @Override - public void onError(Throwable t) { - t.printStackTrace(); - } - - @Override - public void onComplete() {} - }; + blackHoleSubscriber = subscriber(bh); + voidSubscriber = subscriber(bh); client = RSocketFactory.connect().transport(() -> Mono.just(clientConnection)).start().block(); this.bh = bh; } + + private Subscriber subscriber(Blackhole bh) { + return new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(T o) { + bh.consume(o); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + @Override + public void onComplete() {} + }; + } } } diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java b/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java index 148188abe..c41af77d2 100644 --- a/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java +++ b/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java @@ -15,18 +15,18 @@ import reactor.core.publisher.Mono; public class TestingStreaming { - Supplier> serverSupplier = + private Supplier> serverSupplier = () -> LocalServerTransport.create("test"); - Supplier clientSupplier = () -> LocalClientTransport.create("test"); + private Supplier clientSupplier = () -> LocalClientTransport.create("test"); @Test(expected = ApplicationException.class) - public void testRangeButThrowException() throws Exception { + public void testRangeButThrowException() { Closeable server = null; try { server = RSocketFactory.receive() - .errorConsumer(t -> t.printStackTrace()) + .errorConsumer(Throwable::printStackTrace) .acceptor( (connectionSetupPayload, rSocket) -> { AbstractRSocket abstractRSocket = @@ -65,12 +65,12 @@ public Flux requestStream(Payload payload) { } @Test - public void testRangeOfConsumers() throws Exception { + public void testRangeOfConsumers() { Closeable server = null; try { server = RSocketFactory.receive() - .errorConsumer(t -> t.printStackTrace()) + .errorConsumer(Throwable::printStackTrace) .acceptor( (connectionSetupPayload, rSocket) -> { AbstractRSocket abstractRSocket = @@ -102,30 +102,21 @@ public Flux requestStream(Payload payload) { } } - public Flux consumer(String s) { - Mono test = - RSocketFactory.connect() - .errorConsumer(t -> t.printStackTrace()) - .transport(clientSupplier) - .start(); - - Flux payloadFlux = - test.flatMapMany( + private Flux consumer(String s) { + return RSocketFactory.connect() + .errorConsumer(Throwable::printStackTrace) + .transport(clientSupplier) + .start() + .flatMapMany( rSocket -> { AtomicInteger count = new AtomicInteger(); return Flux.range(1, 100) - .flatMap( - i -> { - return rSocket.requestStream(new PayloadImpl("i -> " + i)).take(100); - }, - 1); + .flatMap(i -> rSocket.requestStream(new PayloadImpl("i -> " + i)).take(100), 1); }); - - return payloadFlux; } @Test - public void testSingleConsumer() throws Exception { + public void testSingleConsumer() { Closeable server = null; try {