diff --git a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java index 6190d24e3..22dd9d413 100644 --- a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java @@ -23,6 +23,7 @@ import org.reactivestreams.Subscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; /** Represents a connection with input/output that the protocol uses. */ public interface DuplexConnection extends Availability, Closeable { @@ -86,6 +87,14 @@ default Mono sendOne(ByteBuf frame) { */ ByteBufAllocator alloc(); + /** + * Returns associated to this connection {@link Scheduler} that will process all submitted tasks + * in an ordered / serial fashion. + * + * @return events' ordered {@link Scheduler} + */ + Scheduler eventLoopScheduler(); + @Override default double availability() { return isDisposed() ? 0.0 : 1.0; diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index fabea217b..870f9ca8c 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -208,22 +208,23 @@ private Mono handleFireAndForget(Payload payload) { final AtomicBoolean once = new AtomicBoolean(); - return Mono.defer( - () -> { - if (once.getAndSet(true)) { - return Mono.error( - new IllegalStateException("FireAndForgetMono allows only a single subscriber")); - } + return Mono.defer( + () -> { + if (once.getAndSet(true)) { + return Mono.error( + new IllegalStateException("FireAndForgetMono allows only a single subscriber")); + } - final int streamId = streamIdSupplier.nextStreamId(receivers); - final ByteBuf requestFrame = - RequestFireAndForgetFrameFlyweight.encodeReleasingPayload( - allocator, streamId, payload); + final int streamId = streamIdSupplier.nextStreamId(receivers); + final ByteBuf requestFrame = + RequestFireAndForgetFrameFlyweight.encodeReleasingPayload( + allocator, streamId, payload); - sendProcessor.onNext(requestFrame); + sendProcessor.onNext(requestFrame); - return Mono.empty(); - }); + return Mono.empty(); + }) + .subscribeOn(connection.eventLoopScheduler()); } private Mono handleRequestResponse(final Payload payload) { @@ -284,6 +285,7 @@ public void hookOnTerminal(SignalType signalType) { receivers.remove(streamId, receiver); } })) + .subscribeOn(connection.eventLoopScheduler()) .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER); }); } @@ -356,6 +358,7 @@ void hookOnTerminal(SignalType signalType) { receivers.remove(streamId); } })) + .subscribeOn(connection.eventLoopScheduler()) .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER); }); } @@ -392,120 +395,125 @@ private Flux handleChannel(Payload initialPayload, Flux receiver = UnicastProcessor.create(); - return receiver.transform( - Operators.lift( - (s, actual) -> - new RequestOperator(actual) { + return receiver + .transform( + Operators.lift( + (s, actual) -> + new RequestOperator(actual) { - final BaseSubscriber upstreamSubscriber = - new BaseSubscriber() { + final BaseSubscriber upstreamSubscriber = + new BaseSubscriber() { - boolean first = true; + boolean first = true; - @Override - protected void hookOnSubscribe(Subscription subscription) { - // noops - } + @Override + protected void hookOnSubscribe(Subscription subscription) { + // noops + } - @Override - protected void hookOnNext(Payload payload) { - if (first) { - // need to skip first since we have already sent it - // no need to release it since it was released earlier on the request - // establishment - // phase - first = false; - request(1); - return; - } - if (!PayloadValidationUtils.isValid(mtu, payload)) { - payload.release(); - cancel(); - final IllegalArgumentException t = - new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE); - errorConsumer.accept(t); - // no need to send any errors. - sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId)); - receiver.onError(t); - return; - } - final ByteBuf frame = - PayloadFrameFlyweight.encodeNextReleasingPayload( - allocator, streamId, payload); - - sendProcessor.onNext(frame); - } + @Override + protected void hookOnNext(Payload payload) { + if (first) { + // need to skip first since we have already sent it + // no need to release it since it was released earlier on the + // request + // establishment + // phase + first = false; + request(1); + return; + } + if (!PayloadValidationUtils.isValid(mtu, payload)) { + payload.release(); + cancel(); + final IllegalArgumentException t = + new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE); + errorConsumer.accept(t); + // no need to send any errors. + sendProcessor.onNext( + CancelFrameFlyweight.encode(allocator, streamId)); + receiver.onError(t); + return; + } + final ByteBuf frame = + PayloadFrameFlyweight.encodeNextReleasingPayload( + allocator, streamId, payload); + + sendProcessor.onNext(frame); + } + + @Override + protected void hookOnComplete() { + ByteBuf frame = + PayloadFrameFlyweight.encodeComplete(allocator, streamId); + sendProcessor.onNext(frame); + } + + @Override + protected void hookOnError(Throwable t) { + ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t); + sendProcessor.onNext(frame); + receiver.onError(t); + } - @Override - protected void hookOnComplete() { - ByteBuf frame = PayloadFrameFlyweight.encodeComplete(allocator, streamId); - sendProcessor.onNext(frame); + @Override + protected void hookFinally(SignalType type) { + senders.remove(streamId, this); + } + }; + + @Override + void hookOnFirstRequest(long n) { + final int streamId = streamIdSupplier.nextStreamId(receivers); + this.streamId = streamId; + + final ByteBuf frame = + RequestChannelFrameFlyweight.encodeReleasingPayload( + allocator, streamId, false, n, initialPayload); + + senders.put(streamId, upstreamSubscriber); + receivers.put(streamId, receiver); + + inboundFlux + .limitRate(Queues.SMALL_BUFFER_SIZE) + .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER) + .subscribe(upstreamSubscriber); + + sendProcessor.onNext(frame); + } + + @Override + void hookOnRemainingRequests(long n) { + if (receiver.isDisposed()) { + return; } - @Override - protected void hookOnError(Throwable t) { - ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t); - sendProcessor.onNext(frame); - receiver.onError(t); + sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n)); + } + + @Override + void hookOnCancel() { + senders.remove(streamId, upstreamSubscriber); + if (receivers.remove(streamId, receiver)) { + sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId)); } + } - @Override - protected void hookFinally(SignalType type) { - senders.remove(streamId, this); + @Override + void hookOnTerminal(SignalType signalType) { + if (signalType == SignalType.ON_ERROR) { + upstreamSubscriber.cancel(); } - }; - - @Override - void hookOnFirstRequest(long n) { - final int streamId = streamIdSupplier.nextStreamId(receivers); - this.streamId = streamId; - - final ByteBuf frame = - RequestChannelFrameFlyweight.encodeReleasingPayload( - allocator, streamId, false, n, initialPayload); - - senders.put(streamId, upstreamSubscriber); - receivers.put(streamId, receiver); - - inboundFlux - .limitRate(Queues.SMALL_BUFFER_SIZE) - .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER) - .subscribe(upstreamSubscriber); - - sendProcessor.onNext(frame); - } - - @Override - void hookOnRemainingRequests(long n) { - if (receiver.isDisposed()) { - return; - } - - sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n)); - } - - @Override - void hookOnCancel() { - senders.remove(streamId, upstreamSubscriber); - if (receivers.remove(streamId, receiver)) { - sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId)); - } - } - - @Override - void hookOnTerminal(SignalType signalType) { - if (signalType == SignalType.ON_ERROR) { - upstreamSubscriber.cancel(); - } - receivers.remove(streamId, receiver); - } - - @Override - public void cancel() { - upstreamSubscriber.cancel(); - super.cancel(); - } - })); + receivers.remove(streamId, receiver); + } + + @Override + public void cancel() { + upstreamSubscriber.cancel(); + super.cancel(); + } + })) + .subscribeOn(connection.eventLoopScheduler()); } private Mono handleMetadataPush(Payload payload) { diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java index 933755bb2..3783cb380 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java @@ -24,6 +24,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; /** * A {@link DuplexConnection} implementation that reassembles {@link ByteBuf}s. @@ -75,6 +76,11 @@ public Flux receive() { }); } + @Override + public Scheduler eventLoopScheduler() { + return delegate.eventLoopScheduler(); + } + @Override public ByteBufAllocator alloc() { return delegate.alloc(); diff --git a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java index cf3eeb120..c800a4fdd 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java @@ -30,6 +30,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.scheduler.Scheduler; /** * {@link DuplexConnection#receive()} is a single stream on which the following type of frames @@ -202,6 +203,11 @@ public Flux receive() { })); } + @Override + public Scheduler eventLoopScheduler() { + return source.eventLoopScheduler(); + } + @Override public ByteBufAllocator alloc() { return source.alloc(); diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java index 980de2de1..1e3801580 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java @@ -33,6 +33,7 @@ import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.publisher.*; +import reactor.core.scheduler.Scheduler; import reactor.util.concurrent.Queues; public class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder { @@ -106,6 +107,11 @@ public ResumableDuplexConnection( reconnect(duplexConnection); } + @Override + public Scheduler eventLoopScheduler() { + return curConnection.eventLoopScheduler(); + } + @Override public ByteBufAllocator alloc() { return curConnection.alloc(); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index d7cd8c24b..3f1caf401 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -66,7 +66,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; @@ -924,6 +926,56 @@ private static Stream requestNInteractions() { (rule, payload) -> rule.socket.requestChannel(Flux.just(payload)))); } + @ParameterizedTest + @MethodSource("streamIdRacingCases") + public void ensuresCorrectOrderOfStreamIdIssuingInCaseOfRacing( + BiFunction> interaction1, + BiFunction> interaction2) + throws InterruptedException { + for (int i = 1; i < 10000; i += 4) { + Payload payload = DefaultPayload.create("test"); + Publisher publisher1 = interaction1.apply(rule, payload); + Publisher publisher2 = interaction2.apply(rule, payload); + RaceTestUtils.race( + () -> publisher1.subscribe(AssertSubscriber.create()), + () -> publisher2.subscribe(AssertSubscriber.create())); + + CountDownLatch latch = new CountDownLatch(1); + rule.connection.eventLoopScheduler().schedule(latch::countDown); + + Assertions.assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue(); + + Assertions.assertThat(rule.connection.getSent()) + .extracting(FrameHeaderFlyweight::streamId) + .containsExactly(i, i + 2); + rule.connection.getSent().clear(); + } + } + + public static Stream streamIdRacingCases() { + return Stream.of( + Arguments.of( + (BiFunction>) + (r, p) -> r.socket.fireAndForget(p), + (BiFunction>) + (r, p) -> r.socket.requestResponse(p)), + Arguments.of( + (BiFunction>) + (r, p) -> r.socket.requestResponse(p), + (BiFunction>) + (r, p) -> r.socket.requestStream(p)), + Arguments.of( + (BiFunction>) + (r, p) -> r.socket.requestStream(p), + (BiFunction>) + (r, p) -> r.socket.requestChannel(Flux.just(p))), + Arguments.of( + (BiFunction>) + (r, p) -> r.socket.requestChannel(Flux.just(p)), + (BiFunction>) + (r, p) -> r.socket.fireAndForget(p))); + } + public int sendRequestResponse(Publisher response) { Subscriber sub = TestSubscriber.create(); response.subscribe(sub); diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java index 58323c066..29eeacf62 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java @@ -24,6 +24,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class LocalDuplexConnection implements DuplexConnection { private final ByteBufAllocator allocator; @@ -63,6 +65,11 @@ public ByteBufAllocator alloc() { return allocator; } + @Override + public Scheduler eventLoopScheduler() { + return Schedulers.single(); + } + @Override public void dispose() { onClose.onComplete(); diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java index 17a19b8c9..67060602f 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java @@ -31,6 +31,8 @@ import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; /** * An implementation of {@link DuplexConnection} that provides functionality to modify the behavior @@ -86,6 +88,11 @@ public Flux receive() { return received; } + @Override + public Scheduler eventLoopScheduler() { + return Schedulers.single(); + } + @Override public ByteBufAllocator alloc() { return allocator; diff --git a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java index 9904c2b24..d7297ebc1 100644 --- a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java +++ b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; /** * An implementation of {@link DuplexConnection} that intercepts frames and gathers Micrometer @@ -83,6 +84,11 @@ final class MicrometerDuplexConnection implements DuplexConnection { this.frameCounters = new FrameCounters(connectionType, meterRegistry, tags); } + @Override + public Scheduler eventLoopScheduler() { + return delegate.eventLoopScheduler(); + } + @Override public ByteBufAllocator alloc() { return delegate.alloc(); diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java index d69bd65e8..e3c60a432 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java @@ -28,6 +28,8 @@ import java.util.Objects; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; /** * An implementation of {@link ClientTransport} that connects to a {@link ServerTransport} in the @@ -83,11 +85,13 @@ private Mono connect() { UnboundedProcessor in = new UnboundedProcessor<>(); UnboundedProcessor out = new UnboundedProcessor<>(); MonoProcessor closeNotifier = MonoProcessor.create(); + Scheduler scheduler = Schedulers.single(Schedulers.parallel()); - server.accept(new LocalDuplexConnection(allocator, out, in, closeNotifier)); + server.accept(new LocalDuplexConnection(allocator, scheduler, out, in, closeNotifier)); return Mono.just( - (DuplexConnection) new LocalDuplexConnection(allocator, in, out, closeNotifier)); + (DuplexConnection) + new LocalDuplexConnection(allocator, scheduler, in, out, closeNotifier)); }); } diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java index afaa14f95..f09773e6b 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java @@ -25,11 +25,13 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.scheduler.Scheduler; /** An implementation of {@link DuplexConnection} that connects inside the same JVM. */ final class LocalDuplexConnection implements DuplexConnection { private final ByteBufAllocator allocator; + private final Scheduler scheduler; private final Flux in; private final MonoProcessor onClose; @@ -46,10 +48,12 @@ final class LocalDuplexConnection implements DuplexConnection { */ LocalDuplexConnection( ByteBufAllocator allocator, + Scheduler scheduler, Flux in, Subscriber out, MonoProcessor onClose) { this.allocator = Objects.requireNonNull(allocator, "allocator must not be null"); + this.scheduler = Objects.requireNonNull(scheduler, "scheduler must not be null"); this.in = Objects.requireNonNull(in, "in must not be null"); this.out = Objects.requireNonNull(out, "out must not be null"); this.onClose = Objects.requireNonNull(onClose, "onClose must not be null"); @@ -94,4 +98,9 @@ public Mono sendOne(ByteBuf frame) { public ByteBufAllocator alloc() { return allocator; } + + @Override + public Scheduler eventLoopScheduler() { + return scheduler; + } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java index d71d6b356..36daa355c 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java @@ -25,6 +25,8 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; /** An implementation of {@link DuplexConnection} that connects via TCP. */ @@ -32,6 +34,7 @@ public final class TcpDuplexConnection extends BaseDuplexConnection { private final Connection connection; private final boolean encodeLength; + private final Scheduler scheduler; /** * Creates a new instance @@ -51,6 +54,7 @@ public TcpDuplexConnection(Connection connection) { public TcpDuplexConnection(Connection connection, boolean encodeLength) { this.encodeLength = encodeLength; this.connection = Objects.requireNonNull(connection, "connection must not be null"); + this.scheduler = Schedulers.fromExecutorService(connection.channel().eventLoop()); connection .channel() @@ -66,6 +70,11 @@ public ByteBufAllocator alloc() { return connection.channel().alloc(); } + @Override + public Scheduler eventLoopScheduler() { + return scheduler; + } + @Override protected void doOnClose() { if (!connection.isDisposed()) { diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java index 0183ef19d..e7beede52 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java @@ -24,6 +24,8 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; /** @@ -36,6 +38,7 @@ public final class WebsocketDuplexConnection extends BaseDuplexConnection { private final Connection connection; + private final Scheduler scheduler; /** * Creates a new instance @@ -44,6 +47,7 @@ public final class WebsocketDuplexConnection extends BaseDuplexConnection { */ public WebsocketDuplexConnection(Connection connection) { this.connection = Objects.requireNonNull(connection, "connection must not be null"); + this.scheduler = Schedulers.fromExecutorService(connection.channel().eventLoop()); connection .channel() @@ -59,6 +63,11 @@ public ByteBufAllocator alloc() { return connection.channel().alloc(); } + @Override + public Scheduler eventLoopScheduler() { + return scheduler; + } + @Override protected void doOnClose() { if (!connection.isDisposed()) {