From 9a6a9d0622368d6770be000189cc21de2895bfaa Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Sun, 25 Nov 2018 10:21:08 -0800 Subject: [PATCH 1/3] more performance improvements, and removed dependency on jctools Signed-off-by: Robert Roeser --- build.gradle | 2 - .../main/java/io/rsocket/RSocketClient.java | 507 +++++++++--------- .../main/java/io/rsocket/RSocketServer.java | 100 ++-- .../FragmentationDuplexConnection.java | 10 +- .../rsocket/internal/UnboundedProcessor.java | 90 +++- .../internal/UnicastMonoProcessor.java | 191 +++++++ .../transport/netty/SendPublisher.java | 73 ++- .../transport/netty/TcpDuplexConnection.java | 57 +- .../netty/WebsocketDuplexConnection.java | 166 +++--- 9 files changed, 742 insertions(+), 454 deletions(-) create mode 100644 rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java diff --git a/build.gradle b/build.gradle index e841a03f0..0b0e345c7 100644 --- a/build.gradle +++ b/build.gradle @@ -34,7 +34,6 @@ subprojects { ext['netty.version'] = '4.1.29.Final' ext['netty-boringssl.version'] = '2.0.18.Final' ext['hdrhistogram.version'] = '2.1.10' - ext['jctool.version'] = '2.1.2' ext['mockito.version'] = '2.23.0' ext['slf4j.version'] = '1.7.25' ext['jmh.version'] = '1.21' @@ -56,7 +55,6 @@ subprojects { dependency "io.micrometer:micrometer-core:${ext['micrometer.version']}" dependency "org.assertj:assertj-core:${ext['assertj.version']}" dependency "org.hdrhistogram:HdrHistogram:${ext['hdrhistogram.version']}" - dependency "org.jctools:jctools-core:${ext['jctool.version']}" dependency "org.mockito:mockito-core:${ ext['mockito.version']}" dependency "org.slf4j:slf4j-api:${ext['slf4j.version']}" diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 385fba7c5..7a4d54a4f 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -22,35 +22,34 @@ import io.rsocket.framing.FrameType; import io.rsocket.internal.LimitableRequestPublisher; import io.rsocket.internal.UnboundedProcessor; +import io.rsocket.internal.UnicastMonoProcessor; +import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; -import reactor.core.publisher.UnicastProcessor; +import reactor.core.publisher.*; import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; import java.util.function.Function; /** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */ class RSocketClient implements RSocket { - + private final DuplexConnection connection; private final Function frameDecoder; private final Consumer errorConsumer; private final StreamIdSupplier streamIdSupplier; private final Map senders; - private final Map> receivers; + private final Map> receivers; private final UnboundedProcessor sendProcessor; - private KeepAliveHandler keepAliveHandler; private final Lifecycle lifecycle = new Lifecycle(); - + private KeepAliveHandler keepAliveHandler; + /*server requester*/ RSocketClient( DuplexConnection connection, @@ -60,7 +59,7 @@ class RSocketClient implements RSocket { this( connection, frameDecoder, errorConsumer, streamIdSupplier, Duration.ZERO, Duration.ZERO, 0); } - + /*client requester*/ RSocketClient( DuplexConnection connection, @@ -74,26 +73,26 @@ class RSocketClient implements RSocket { this.frameDecoder = frameDecoder; this.errorConsumer = errorConsumer; this.streamIdSupplier = streamIdSupplier; - this.senders = Collections.synchronizedMap(new IntObjectHashMap<>()); + this.senders = Collections.synchronizedMap(new IntObjectHashMap<>()); this.receivers = Collections.synchronizedMap(new IntObjectHashMap<>()); - + // DO NOT Change the order here. The Send processor must be subscribed to before receiving this.sendProcessor = new UnboundedProcessor<>(); - + connection.onClose().doFinally(signalType -> terminate()).subscribe(null, errorConsumer); - + connection .send(sendProcessor) .doFinally(this::handleSendProcessorCancel) .subscribe(null, this::handleSendProcessorError); - + connection.receive().subscribe(this::handleIncomingFrames, errorConsumer); - + if (!Duration.ZERO.equals(tickPeriod)) { this.keepAliveHandler = KeepAliveHandler.ofClient( new KeepAliveHandler.KeepAlive(tickPeriod, ackTimeout, missedAcks)); - + keepAliveHandler .timeout() .subscribe( @@ -101,7 +100,7 @@ class RSocketClient implements RSocket { String message = String.format("No keep-alive acks for %d ms", keepAlive.getTimeoutMillis()); ConnectionErrorException err = new ConnectionErrorException(message); - lifecycle.terminate(err); + lifecycle.setTerminationError(err); errorConsumer.accept(err); connection.dispose(); }); @@ -110,9 +109,9 @@ class RSocketClient implements RSocket { keepAliveHandler = null; } } - + private void handleSendProcessorError(Throwable t) { - Throwable terminationError = lifecycle.terminationError(); + Throwable terminationError = lifecycle.getTerminationError(); Throwable err = terminationError != null ? terminationError : t; for (Subscriber subscriber : receivers.values()) { try { @@ -121,17 +120,17 @@ private void handleSendProcessorError(Throwable t) { errorConsumer.accept(e); } } - + for (LimitableRequestPublisher p : senders.values()) { p.cancel(); } } - + private void handleSendProcessorCancel(SignalType t) { if (SignalType.ON_ERROR == t) { return; } - + for (Subscriber subscriber : receivers.values()) { try { subscriber.onError(new Throwable("closed connection")); @@ -139,261 +138,265 @@ private void handleSendProcessorCancel(SignalType t) { errorConsumer.accept(e); } } - + for (LimitableRequestPublisher p : senders.values()) { p.cancel(); } } - + @Override public Mono fireAndForget(Payload payload) { return handleFireAndForget(payload); } - + @Override public Mono requestResponse(Payload payload) { return handleRequestResponse(payload); } - + @Override public Flux requestStream(Payload payload) { return handleRequestStream(payload); } - + @Override public Flux requestChannel(Publisher payloads) { return handleChannel(Flux.from(payloads)); } - + @Override public Mono metadataPush(Payload payload) { return handleMetadataPush(payload); } - + @Override public double availability() { return connection.availability(); } - + @Override public void dispose() { connection.dispose(); } - + @Override public boolean isDisposed() { return connection.isDisposed(); } - + @Override public Mono onClose() { return connection.onClose(); } - + private Mono handleFireAndForget(Payload payload) { return lifecycle - .active() - .then( - Mono.fromRunnable( - () -> { - final int streamId = streamIdSupplier.nextStreamId(); - final Frame requestFrame = - Frame.Request.from(streamId, FrameType.REQUEST_FNF, payload, 1); - payload.release(); - sendProcessor.onNext(requestFrame); - })); + .active() + .then( + Mono.fromRunnable( + () -> { + final int streamId = streamIdSupplier.nextStreamId(); + final Frame requestFrame = + Frame.Request.from(streamId, FrameType.REQUEST_FNF, payload, 1); + payload.release(); + sendProcessor.onNext(requestFrame); + })); } - + private Flux handleRequestStream(final Payload payload) { return lifecycle - .active() - .thenMany( - Flux.defer( - () -> { - int streamId = streamIdSupplier.nextStreamId(); - - UnicastProcessor receiver = UnicastProcessor.create(); - receivers.put(streamId, receiver); - - AtomicBoolean first = new AtomicBoolean(false); - - return receiver - .doOnRequest( - n -> { - if (first.compareAndSet(false, true) && !receiver.isDisposed()) { - final Frame requestFrame = - Frame.Request.from( - streamId, FrameType.REQUEST_STREAM, payload, n); - payload.release(); - sendProcessor.onNext(requestFrame); - } else if (contains(streamId) && !receiver.isDisposed()) { - sendProcessor.onNext(Frame.RequestN.from(streamId, n)); - } - sendProcessor.drain(); - }) - .doOnError( - t -> { - if (contains(streamId) && !receiver.isDisposed()) { - sendProcessor.onNext(Frame.Error.from(streamId, t)); - } - }) - .doOnCancel( - () -> { - if (contains(streamId) && !receiver.isDisposed()) { - sendProcessor.onNext(Frame.Cancel.from(streamId)); - } - }) - .doFinally( - s -> { - receivers.remove(streamId); - }); - })); + .active() + .thenMany( + Flux.defer( + () -> { + int streamId = streamIdSupplier.nextStreamId(); + + UnicastProcessor receiver = UnicastProcessor.create(); + receivers.put(streamId, receiver); + + AtomicBoolean first = new AtomicBoolean(false); + + return receiver + .doOnRequest( + n -> { + if (first.compareAndSet(false, true) && !receiver.isDisposed()) { + final Frame requestFrame = + Frame.Request.from( + streamId, FrameType.REQUEST_STREAM, payload, n); + payload.release(); + sendProcessor.onNext(requestFrame); + } else if (contains(streamId) && !receiver.isDisposed()) { + sendProcessor.onNext(Frame.RequestN.from(streamId, n)); + } + sendProcessor.drain(); + }) + .doOnError( + t -> { + if (contains(streamId) && !receiver.isDisposed()) { + sendProcessor.onNext(Frame.Error.from(streamId, t)); + } + }) + .doOnCancel( + () -> { + if (contains(streamId) && !receiver.isDisposed()) { + sendProcessor.onNext(Frame.Cancel.from(streamId)); + } + }) + .doFinally( + s -> { + receivers.remove(streamId); + }); + })); } - + private Mono handleRequestResponse(final Payload payload) { return lifecycle - .active() - .then( - Mono.defer( - () -> { - int streamId = streamIdSupplier.nextStreamId(); - final Frame requestFrame = - Frame.Request.from(streamId, FrameType.REQUEST_RESPONSE, payload, 1); - payload.release(); - - UnicastProcessor receiver = UnicastProcessor.create(); - receivers.put(streamId, receiver); - - sendProcessor.onNext(requestFrame); - - return receiver - .singleOrEmpty() - .doOnError(t -> sendProcessor.onNext(Frame.Error.from(streamId, t))) - .doOnCancel(() -> sendProcessor.onNext(Frame.Cancel.from(streamId))) - .doFinally( - s -> { - receivers.remove(streamId); - }); - })); + .active() + .then( + Mono.defer( + () -> { + int streamId = streamIdSupplier.nextStreamId(); + final Frame requestFrame = + Frame.Request.from(streamId, FrameType.REQUEST_RESPONSE, payload, 1); + payload.release(); + + UnicastMonoProcessor receiver = UnicastMonoProcessor.create(); + receivers.put(streamId, receiver); + + sendProcessor.onNext(requestFrame); + + return receiver + .doOnError(t -> sendProcessor.onNext(Frame.Error.from(streamId, t))) + .doFinally( + s -> { + if (s == SignalType.CANCEL) { + sendProcessor.onNext(Frame.Cancel.from(streamId)); + } + + receivers.remove(streamId); + }); + })); } - + private Flux handleChannel(Flux request) { return lifecycle - .active() - .thenMany( - Flux.defer( - () -> { - final UnicastProcessor receiver = UnicastProcessor.create(); - final int streamId = streamIdSupplier.nextStreamId(); - final AtomicBoolean firstRequest = new AtomicBoolean(true); - - return receiver - .doOnRequest( - n -> { - if (firstRequest.compareAndSet(true, false)) { - final AtomicBoolean firstPayload = new AtomicBoolean(true); - final Flux requestFrames = - request - .transform( - f -> { - LimitableRequestPublisher wrapped = - LimitableRequestPublisher.wrap(f); - // Need to set this to one for first the frame - wrapped.increaseRequestLimit(1); - senders.put(streamId, wrapped); - receivers.put(streamId, receiver); - - return wrapped; - }) - .map( - payload -> { - final Frame requestFrame; - if (firstPayload.compareAndSet(true, false)) { - requestFrame = - Frame.Request.from( - streamId, - FrameType.REQUEST_CHANNEL, - payload, - n); - } else { - requestFrame = - Frame.PayloadFrame.from( - streamId, FrameType.NEXT, payload); - } - payload.release(); - return requestFrame; - }) - .doOnComplete( - () -> { + .active() + .thenMany( + Flux.defer( + () -> { + final UnicastProcessor receiver = UnicastProcessor.create(); + final int streamId = streamIdSupplier.nextStreamId(); + final AtomicBoolean firstRequest = new AtomicBoolean(true); + + return receiver + .doOnRequest( + n -> { + if (firstRequest.compareAndSet(true, false)) { + final AtomicBoolean firstPayload = new AtomicBoolean(true); + final Flux requestFrames = + request + .transform( + f -> { + LimitableRequestPublisher wrapped = + LimitableRequestPublisher.wrap(f); + // Need to set this to one for first the frame + wrapped.increaseRequestLimit(1); + senders.put(streamId, wrapped); + receivers.put(streamId, receiver); + + return wrapped; + }) + .map( + payload -> { + final Frame requestFrame; + if (firstPayload.compareAndSet(true, false)) { + requestFrame = + Frame.Request.from( + streamId, + FrameType.REQUEST_CHANNEL, + payload, + n); + } else { + requestFrame = + Frame.PayloadFrame.from( + streamId, FrameType.NEXT, payload); + } + payload.release(); + return requestFrame; + }) + .doOnComplete( + () -> { + if (contains(streamId) && !receiver.isDisposed()) { + sendProcessor.onNext( + Frame.PayloadFrame.from( + streamId, FrameType.COMPLETE)); + } + if (firstPayload.get()) { + receiver.onComplete(); + } + }); + + requestFrames.subscribe( + sendProcessor::onNext, + t -> { + errorConsumer.accept(t); + receiver.dispose(); + }); + } else { if (contains(streamId) && !receiver.isDisposed()) { - sendProcessor.onNext( - Frame.PayloadFrame.from( - streamId, FrameType.COMPLETE)); + sendProcessor.onNext(Frame.RequestN.from(streamId, n)); } - if (firstPayload.get()) { - receiver.onComplete(); - } - }); - - requestFrames.subscribe( - sendProcessor::onNext, - t -> { - errorConsumer.accept(t); - receiver.dispose(); - }); - } else { - if (contains(streamId) && !receiver.isDisposed()) { - sendProcessor.onNext(Frame.RequestN.from(streamId, n)); - } - } - }) - .doOnError( - t -> { - if (contains(streamId) && !receiver.isDisposed()) { - sendProcessor.onNext(Frame.Error.from(streamId, t)); - } - }) - .doOnCancel( - () -> { - if (contains(streamId) && !receiver.isDisposed()) { - sendProcessor.onNext(Frame.Cancel.from(streamId)); - } - }) - .doFinally( - s -> { - receivers.remove(streamId); - LimitableRequestPublisher sender = senders.remove(streamId); - if (sender != null) { - sender.cancel(); - } - }); - })); + } + }) + .doOnError( + t -> { + if (contains(streamId) && !receiver.isDisposed()) { + sendProcessor.onNext(Frame.Error.from(streamId, t)); + } + }) + .doOnCancel( + () -> { + if (contains(streamId) && !receiver.isDisposed()) { + sendProcessor.onNext(Frame.Cancel.from(streamId)); + } + }) + .doFinally( + s -> { + receivers.remove(streamId); + LimitableRequestPublisher sender = senders.remove(streamId); + if (sender != null) { + sender.cancel(); + } + }); + })); } - + private Mono handleMetadataPush(Payload payload) { return lifecycle - .active() - .then(Mono.fromRunnable( - () -> { - final Frame requestFrame = Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 1); - payload.release(); - sendProcessor.onNext(requestFrame); - })); + .active() + .then( + Mono.fromRunnable( + () -> { + final Frame requestFrame = + Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 1); + payload.release(); + sendProcessor.onNext(requestFrame); + })); } - + private boolean contains(int streamId) { return receivers.containsKey(streamId); } - + protected void terminate() { - - lifecycle.terminate(new ClosedChannelException()); - + + lifecycle.setTerminationError(new ClosedChannelException()); + if (keepAliveHandler != null) { keepAliveHandler.dispose(); } try { - for (UnicastProcessor subscriber : receivers.values()) { + for (Processor subscriber : receivers.values()) { cleanUpSubscriber(subscriber); } for (LimitableRequestPublisher p : senders.values()) { @@ -405,7 +408,7 @@ protected void terminate() { sendProcessor.dispose(); } } - + private synchronized void cleanUpLimitableRequestPublisher( LimitableRequestPublisher limitableRequestPublisher) { try { @@ -414,15 +417,15 @@ private synchronized void cleanUpLimitableRequestPublisher( errorConsumer.accept(t); } } - - private synchronized void cleanUpSubscriber(UnicastProcessor subscriber) { + + private synchronized void cleanUpSubscriber(Processor subscriber) { try { - subscriber.onError(lifecycle.terminationError()); + subscriber.onError(lifecycle.getTerminationError()); } catch (Throwable t) { errorConsumer.accept(t); } } - + private void handleIncomingFrames(Frame frame) { try { int streamId = frame.getStreamId(); @@ -436,12 +439,12 @@ private void handleIncomingFrames(Frame frame) { frame.release(); } } - + private void handleStreamZero(FrameType type, Frame frame) { switch (type) { case ERROR: RuntimeException error = Exceptions.from(frame); - lifecycle.terminate(error); + lifecycle.setTerminationError(error); errorConsumer.accept(error); connection.dispose(); break; @@ -459,7 +462,7 @@ private void handleStreamZero(FrameType type, Frame frame) { "Client received supported frame on stream 0: " + frame.toString())); } } - + private void handleFrame(int streamId, FrameType type, Frame frame) { Subscriber receiver = receivers.get(streamId); if (receiver == null) { @@ -475,27 +478,27 @@ private void handleFrame(int streamId, FrameType type, Frame frame) { receiver.onComplete(); break; case CANCEL: - { - LimitableRequestPublisher sender = senders.remove(streamId); - receivers.remove(streamId); - if (sender != null) { - sender.cancel(); - } - break; + { + LimitableRequestPublisher sender = senders.remove(streamId); + receivers.remove(streamId); + if (sender != null) { + sender.cancel(); } + break; + } case NEXT: receiver.onNext(frameDecoder.apply(frame)); break; case REQUEST_N: - { - LimitableRequestPublisher sender = senders.get(streamId); - if (sender != null) { - int n = Frame.RequestN.requestN(frame); - sender.increaseRequestLimit(n); - sendProcessor.drain(); - } - break; + { + LimitableRequestPublisher sender = senders.get(streamId); + if (sender != null) { + int n = Frame.RequestN.requestN(frame); + sender.increaseRequestLimit(n); + sendProcessor.drain(); } + break; + } case COMPLETE: receiver.onComplete(); receivers.remove(streamId); @@ -506,14 +509,14 @@ private void handleFrame(int streamId, FrameType type, Frame frame) { } } } - + private void handleMissingResponseProcessor(int streamId, FrameType type, Frame frame) { if (!streamIdSupplier.isBeforeOrCurrent(streamId)) { if (type == FrameType.ERROR) { // message for stream that has never existed, we have a problem with // the overall connection and must tear down String errorMessage = frame.getDataUtf8(); - + throw new IllegalStateException( "Client received error for non-existent stream: " + streamId @@ -530,29 +533,31 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, Frame // receiving a frame after a given stream has been cancelled/completed, // so ignore (cancellation is async so there is a race condition) } - + private static class Lifecycle { - - private final AtomicReference terminationError = new AtomicReference<>(); - + + private static final AtomicReferenceFieldUpdater TERMINATION_ERROR = + AtomicReferenceFieldUpdater.newUpdater( + Lifecycle.class, Throwable.class, "terminationError"); + private volatile Throwable terminationError; + public Mono active() { return Mono.create( sink -> { - Throwable err = terminationError(); - if (err == null) { + if (terminationError == null) { sink.success(); } else { - sink.error(err); + sink.error(terminationError); } }); } - - public void terminate(Throwable err) { - this.terminationError.compareAndSet(null, err); + + public void setTerminationError(Throwable err) { + TERMINATION_ERROR.compareAndSet(this, null, err); } - - public Throwable terminationError() { - return terminationError.get(); + + public Throwable getTerminationError() { + return terminationError; } } } diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index bc0bfcf17..3b47d59e3 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -22,14 +22,12 @@ import io.rsocket.framing.FrameType; import io.rsocket.internal.LimitableRequestPublisher; import io.rsocket.internal.UnboundedProcessor; +import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; -import reactor.core.publisher.UnicastProcessor; +import reactor.core.publisher.*; import java.util.Collections; import java.util.Map; @@ -42,18 +40,18 @@ /** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */ class RSocketServer implements RSocket { - + private final DuplexConnection connection; private final RSocket requestHandler; private final Function frameDecoder; private final Consumer errorConsumer; - + private final Map sendingSubscriptions; - private final Map> channelProcessors; - + private final Map> channelProcessors; + private final UnboundedProcessor sendProcessor; private KeepAliveHandler keepAliveHandler; - + /*client responder*/ RSocketServer( DuplexConnection connection, @@ -62,7 +60,7 @@ class RSocketServer implements RSocket { Consumer errorConsumer) { this(connection, requestHandler, frameDecoder, errorConsumer, 0, 0); } - + /*server responder*/ RSocketServer( DuplexConnection connection, @@ -77,18 +75,18 @@ class RSocketServer implements RSocket { this.errorConsumer = errorConsumer; this.sendingSubscriptions = Collections.synchronizedMap(new IntObjectHashMap<>()); this.channelProcessors = Collections.synchronizedMap(new IntObjectHashMap<>()); - + // DO NOT Change the order here. The Send processor must be subscribed to before receiving // connections this.sendProcessor = new UnboundedProcessor<>(); - + connection .send(sendProcessor) .doFinally(this::handleSendProcessorCancel) .subscribe(null, this::handleSendProcessorError); - + Disposable receiveDisposable = connection.receive().subscribe(this::handleFrame, errorConsumer); - + this.connection .onClose() .doFinally( @@ -97,11 +95,11 @@ class RSocketServer implements RSocket { receiveDisposable.dispose(); }) .subscribe(null, errorConsumer); - + if (tickPeriod != 0) { keepAliveHandler = KeepAliveHandler.ofServer(new KeepAliveHandler.KeepAlive(tickPeriod, ackTimeout)); - + keepAliveHandler .timeout() .subscribe( @@ -116,7 +114,7 @@ class RSocketServer implements RSocket { keepAliveHandler = null; } } - + private void handleSendProcessorError(Throwable t) { for (Subscription subscription : sendingSubscriptions.values()) { try { @@ -125,21 +123,21 @@ private void handleSendProcessorError(Throwable t) { errorConsumer.accept(e); } } - - for (UnicastProcessor subscription : channelProcessors.values()) { + + for (Processor subscription : channelProcessors.values()) { try { - subscription.cancel(); + subscription.onError(t); } catch (Throwable e) { errorConsumer.accept(e); } } } - + private void handleSendProcessorCancel(SignalType t) { if (SignalType.ON_ERROR == t) { return; } - + for (Subscription subscription : sendingSubscriptions.values()) { try { subscription.cancel(); @@ -147,16 +145,16 @@ private void handleSendProcessorCancel(SignalType t) { errorConsumer.accept(e); } } - - for (UnicastProcessor subscription : channelProcessors.values()) { + + for (Processor subscription : channelProcessors.values()) { try { - subscription.cancel(); + subscription.onComplete(); } catch (Throwable e) { errorConsumer.accept(e); } } } - + @Override public Mono fireAndForget(Payload payload) { try { @@ -165,7 +163,7 @@ public Mono fireAndForget(Payload payload) { return Mono.error(t); } } - + @Override public Mono requestResponse(Payload payload) { try { @@ -174,7 +172,7 @@ public Mono requestResponse(Payload payload) { return Mono.error(t); } } - + @Override public Flux requestStream(Payload payload) { try { @@ -183,7 +181,7 @@ public Flux requestStream(Payload payload) { return Flux.error(t); } } - + @Override public Flux requestChannel(Publisher payloads) { try { @@ -192,7 +190,7 @@ public Flux requestChannel(Publisher payloads) { return Flux.error(t); } } - + @Override public Mono metadataPush(Payload payload) { try { @@ -201,43 +199,45 @@ public Mono metadataPush(Payload payload) { return Mono.error(t); } } - + @Override public void dispose() { connection.dispose(); } - + @Override public boolean isDisposed() { return connection.isDisposed(); } - + @Override public Mono onClose() { return connection.onClose(); } - + private void cleanup() { if (keepAliveHandler != null) { keepAliveHandler.dispose(); } cleanUpSendingSubscriptions(); cleanUpChannelProcessors(); - + requestHandler.dispose(); sendProcessor.dispose(); } - + private synchronized void cleanUpSendingSubscriptions() { sendingSubscriptions.values().forEach(Subscription::cancel); sendingSubscriptions.clear(); } - + private synchronized void cleanUpChannelProcessors() { - channelProcessors.values().forEach(Subscription::cancel); + channelProcessors + .values() + .forEach(Processor::onComplete); channelProcessors.clear(); } - + private void handleFrame(Frame frame) { try { int streamId = frame.getStreamId(); @@ -313,14 +313,14 @@ private void handleFrame(Frame frame) { frame.release(); } } - + private void handleFireAndForget(int streamId, Mono result) { result .doOnSubscribe(subscription -> sendingSubscriptions.put(streamId, subscription)) .doFinally(signalType -> sendingSubscriptions.remove(streamId)) .subscribe(null, errorConsumer); } - + private void handleRequestResponse(int streamId, Mono response) { response .doOnSubscribe(subscription -> sendingSubscriptions.put(streamId, subscription)) @@ -340,7 +340,7 @@ private void handleRequestResponse(int streamId, Mono response) { .doFinally(signalType -> sendingSubscriptions.remove(streamId)) .subscribe(sendProcessor::onNext, t -> handleError(streamId, t)); } - + private void handleStream(int streamId, Flux response, int initialRequestN) { response .transform( @@ -364,44 +364,44 @@ private void handleStream(int streamId, Flux response, int initialReque sendProcessor.onNext(frame); }); } - + private void handleChannel(int streamId, Payload payload, int initialRequestN) { UnicastProcessor frames = UnicastProcessor.create(); channelProcessors.put(streamId, frames); - + Flux payloads = frames .doOnCancel(() -> sendProcessor.onNext(Frame.Cancel.from(streamId))) .doOnError(t -> sendProcessor.onNext(Frame.Error.from(streamId, t))) .doOnRequest(l -> sendProcessor.onNext(Frame.RequestN.from(streamId, l))) .doFinally(signalType -> channelProcessors.remove(streamId)); - + // not chained, as the payload should be enqueued in the Unicast processor before this method // returns // and any later payload can be processed frames.onNext(payload); - + handleStream(streamId, requestChannel(payloads), initialRequestN); } - + private void handleKeepAliveFrame(Frame frame) { if (keepAliveHandler != null) { keepAliveHandler.receive(frame); } } - + private void handleCancelFrame(int streamId) { Subscription subscription = sendingSubscriptions.remove(streamId); if (subscription != null) { subscription.cancel(); } } - + private void handleError(int streamId, Throwable t) { errorConsumer.accept(t); sendProcessor.onNext(Frame.Error.from(streamId, t)); } - + private void handleRequestN(int streamId, Frame frame) { final Subscription subscription = sendingSubscriptions.get(streamId); if (subscription != null) { diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java index 0ac558ce5..fd459fe03 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java @@ -19,7 +19,6 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; import io.netty.util.collection.IntObjectHashMap; -import io.netty.util.collection.LongObjectHashMap; import io.rsocket.DuplexConnection; import io.rsocket.Frame; import io.rsocket.util.AbstractionLeakingFrameUtils; @@ -90,12 +89,10 @@ public FragmentationDuplexConnection( .doFinally( signalType -> { Collection values; - synchronized (this) { + synchronized (FragmentationDuplexConnection.this) { values = frameReassemblers.values(); } - for (FrameReassembler reassembler : values) { - reassembler.dispose(); - } + values.forEach(FrameReassembler::dispose); }) .subscribe(); } @@ -146,11 +143,10 @@ private Flux toFragmentedFrames(int streamId, io.rsocket.framing.Frame fr private Mono toReassembledFrames(int streamId, io.rsocket.framing.Frame fragment) { FrameReassembler frameReassembler; - synchronized (this) { frameReassembler = frameReassemblers.computeIfAbsent( - streamId, i -> createFrameReassembler(byteBufAllocator)); + streamId, i -> createFrameReassembler(byteBufAllocator)); } return Mono.justOrEmpty(frameReassembler.reassemble(fragment)) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index 341de4608..bcfa77287 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -17,6 +17,7 @@ package io.rsocket.internal; import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.shaded.org.jctools.queues.MpscUnboundedArrayQueue; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -43,32 +44,24 @@ public final class UnboundedProcessor extends FluxProcessor implements Fuseable.QueueSubscription, Fuseable { - final Queue queue; - - volatile boolean done; - Throwable error; - - volatile CoreSubscriber actual; - - volatile boolean cancelled; - - volatile int once; - @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater ONCE = AtomicIntegerFieldUpdater.newUpdater(UnboundedProcessor.class, "once"); - - volatile int wip; - @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater WIP = AtomicIntegerFieldUpdater.newUpdater(UnboundedProcessor.class, "wip"); - - volatile long requested; - @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(UnboundedProcessor.class, "requested"); + final Queue queue; + volatile boolean done; + Throwable error; + volatile CoreSubscriber actual; + volatile boolean cancelled; + volatile int once; + volatile int wip; + volatile long requested; + volatile boolean outputFused; public UnboundedProcessor() { this.queue = Queues.unboundedMultiproducer().get(); @@ -130,20 +123,60 @@ void drainRegular(Subscriber a) { } } } - + + void drainFused(Subscriber a) { + int missed = 1; + + final Queue q = queue; + + for (;;) { + + if (cancelled) { + q.clear(); + actual = null; + return; + } + + boolean d = done; + + a.onNext(null); + + if (d) { + actual = null; + + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + return; + } + + missed = WIP.addAndGet(this, -missed); + if (missed == 0) { + break; + } + } + } + + public void drain() { if (WIP.getAndIncrement(this) != 0) { return; } int missed = 1; - + for (; ; ) { Subscriber a = actual; if (a != null) { - - drainRegular(a); - + + if (outputFused) { + drainFused(a); + } else { + drainRegular(a); + } return; } @@ -281,6 +314,11 @@ public void cancel() { } } + @Override + public T peek() { + return queue.peek(); + } + @Override @Nullable public T poll() { @@ -306,12 +344,16 @@ public void clear() { } } } - + @Override public int requestFusion(int requestedMode) { + if ((requestedMode & Fuseable.ASYNC) != 0) { + outputFused = true; + return Fuseable.ASYNC; + } return Fuseable.NONE; } - + @Override public void dispose() { cancel(); diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java new file mode 100644 index 000000000..47849c4c0 --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java @@ -0,0 +1,191 @@ +package io.rsocket.internal; + +import org.reactivestreams.Processor; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.Disposable; +import reactor.core.Scannable; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Operators; +import reactor.util.annotation.Nullable; +import reactor.util.context.Context; +import reactor.util.function.Tuple2; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.LongSupplier; +import java.util.stream.Stream; + +public class UnicastMonoProcessor extends Mono + implements Processor, + CoreSubscriber, + Disposable, + Subscription, + Scannable, + LongSupplier { + + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater ONCE = + AtomicIntegerFieldUpdater.newUpdater(UnicastMonoProcessor.class, "once"); + + private final MonoProcessor processor; + + @SuppressWarnings("unused") + private volatile int once; + + private UnicastMonoProcessor() { + this.processor = MonoProcessor.create(); + } + + public static UnicastMonoProcessor create() { + return new UnicastMonoProcessor<>(); + } + + @Override + public Stream actuals() { + return processor.actuals(); + } + + @Override + public boolean isScanAvailable() { + return processor.isScanAvailable(); + } + + @Override + public String name() { + return processor.name(); + } + + @Override + public String stepName() { + return processor.stepName(); + } + + @Override + public Stream steps() { + return processor.steps(); + } + + @Override + public Stream parents() { + return processor.parents(); + } + + @Override + @Nullable + public T scan(Attr key) { + return processor.scan(key); + } + + @Override + public T scanOrDefault(Attr key, T defaultValue) { + return processor.scanOrDefault(key, defaultValue); + } + + @Override + public Stream> tags() { + return processor.tags(); + } + + @Override + public long getAsLong() { + return processor.getAsLong(); + } + + @Override + public void onSubscribe(Subscription s) { + processor.onSubscribe(s); + } + + @Override + public void onNext(O o) { + processor.onNext(o); + } + + @Override + public void onError(Throwable t) { + processor.onError(t); + } + + @Nullable + public Throwable getError() { + return processor.getError(); + } + + public boolean isCancelled() { + return processor.isCancelled(); + } + + public boolean isError() { + return processor.isError(); + } + + public boolean isSuccess() { + return processor.isSuccess(); + } + + public boolean isTerminated() { + return processor.isTerminated(); + } + + @Nullable + public O peek() { + return processor.peek(); + } + + public long downstreamCount() { + return processor.downstreamCount(); + } + + public boolean hasDownstreams() { + return processor.hasDownstreams(); + } + + @Override + public void onComplete() { + processor.onComplete(); + } + + @Override + public void request(long n) { + processor.request(n); + } + + @Override + public void cancel() { + processor.cancel(); + } + + @Override + public void dispose() { + processor.dispose(); + } + + @Override + public Context currentContext() { + return processor.currentContext(); + } + + @Override + public boolean isDisposed() { + return processor.isDisposed(); + } + + @Override + public Object scanUnsafe(Attr key) { + return processor.scanUnsafe(key); + } + + @Override + public void subscribe(CoreSubscriber actual) { + Objects.requireNonNull(actual, "subscribe"); + if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { + processor.subscribe(actual); + } else { + Operators.error( + actual, + new IllegalStateException("UnicastMonoProcessor allows only a single Subscriber")); + } + } +} diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java index 29c5f7ced..c8d57de30 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java @@ -10,6 +10,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; +import reactor.core.Fuseable; import reactor.core.publisher.Flux; import reactor.core.publisher.Operators; import reactor.util.concurrent.Queues; @@ -27,20 +28,23 @@ class SendPublisher extends Flux { private static final int MAX_SIZE = Queues.SMALL_BUFFER_SIZE; private static final int REFILL_SIZE = MAX_SIZE / 2; - private static final AtomicReferenceFieldUpdater - INNER_SUBSCRIBER = - AtomicReferenceFieldUpdater.newUpdater( - SendPublisher.class, Object.class, "innerSubscriber"); + private static final AtomicReferenceFieldUpdater INNER_SUBSCRIBER = + AtomicReferenceFieldUpdater.newUpdater(SendPublisher.class, Object.class, "innerSubscriber"); + private static final AtomicIntegerFieldUpdater TERMINATED = + AtomicIntegerFieldUpdater.newUpdater(SendPublisher.class, "terminated"); private final Publisher source; private final Channel channel; private final EventLoop eventLoop; - private final Queue queue; - private final AtomicBoolean terminated = new AtomicBoolean(); + private final Queue queue; private final AtomicBoolean completed = new AtomicBoolean(); private final Function transformer; private final SizeOf sizeOf; - + + @SuppressWarnings("unused") + private volatile int terminated; + private int pending; + @SuppressWarnings("unused") private volatile int wip; @@ -51,18 +55,31 @@ class SendPublisher extends Flux { private long requestedUpstream = MAX_SIZE; + private boolean fuse; + @SuppressWarnings("unchecked") SendPublisher( Publisher source, Channel channel, Function transformer, SizeOf sizeOf) { + this(Queues.small().get(), source, channel, transformer, sizeOf); + } + + @SuppressWarnings("unchecked") + SendPublisher( + Queue queue, + Publisher source, + Channel channel, + Function transformer, + SizeOf sizeOf) { this.source = source; this.channel = channel; - this.queue = Queues.small().get(); + this.queue = queue; this.eventLoop = channel.eventLoop(); this.transformer = transformer; this.sizeOf = sizeOf; + + fuse = queue instanceof Fuseable.QueueSubscription; } - @SuppressWarnings("unchecked") private ChannelPromise writeCleanupPromise(V poll) { return channel .newPromise() @@ -90,9 +107,9 @@ private void tryComplete(InnerSubscriber is) { if (pending == 0 && completed.get() && queue.isEmpty() - && !terminated.get() + && terminated == 0 && !is.pendingFlush.get()) { - terminated.set(true); + TERMINATED.set(SendPublisher.this, 1); is.destination.onComplete(); } } @@ -101,12 +118,13 @@ private void tryComplete(InnerSubscriber is) { public void subscribe(CoreSubscriber destination) { InnerSubscriber innerSubscriber = new InnerSubscriber(destination); if (!INNER_SUBSCRIBER.compareAndSet(this, null, innerSubscriber)) { - throw new IllegalStateException("SendPublisher only allows one subscription"); + Operators.error( + destination, new IllegalStateException("SendPublisher only allows one subscription")); + } else { + InnerSubscription innerSubscription = new InnerSubscription(innerSubscriber); + destination.onSubscribe(innerSubscription); + source.subscribe(innerSubscriber); } - - InnerSubscription innerSubscription = new InnerSubscription(innerSubscriber); - destination.onSubscribe(innerSubscription); - source.subscribe(innerSubscriber); } @FunctionalInterface @@ -132,8 +150,8 @@ public void onSubscribe(Subscription s) { @Override public void onNext(Frame t) { - if (!terminated.get()) { - if (!queue.offer(transformer.apply(t))) { + if (terminated == 0) { + if (!fuse && !queue.offer(t)) { throw new IllegalStateException("missing back pressure"); } tryDrain(); @@ -142,7 +160,7 @@ public void onNext(Frame t) { @Override public void onError(Throwable t) { - if (terminated.compareAndSet(false, true)) { + if (TERMINATED.compareAndSet(SendPublisher.this, 0, 1)) { try { s.cancel(); destination.onError(t); @@ -180,7 +198,7 @@ private void flush() { } private void tryDrain() { - if (wip == 0 && !terminated.get() && WIP.getAndIncrement(SendPublisher.this) == 0) { + if (wip == 0 && terminated == 0 && WIP.getAndIncrement(SendPublisher.this) == 0) { try { if (eventLoop.inEventLoop()) { drain(); @@ -202,8 +220,9 @@ private void drain() { long r = Math.min(requested, requestedUpstream); while (r-- > 0) { - V poll = queue.poll(); - if (poll != null && !terminated.get()) { + Frame frame = queue.poll(); + if (frame != null && terminated == 0) { + V poll = transformer.apply(frame); int readableBytes = sizeOf.size(poll); pending++; if (channel.isWritable() && readableBytes <= channel.bytesBeforeUnwritable()) { @@ -225,7 +244,7 @@ private void drain() { eventLoop.execute(this::flush); } - if (terminated.get()) { + if (terminated == 1) { break; } @@ -259,7 +278,13 @@ public void request(long n) { @Override public void cancel() { - terminated.set(true); + TERMINATED.set(SendPublisher.this, 1); + while (!queue.isEmpty()) { + Frame poll = queue.poll(); + if (poll != null) { + ReferenceCountUtil.safeRelease(poll); + } + } } } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java index 09fd07325..4f6fa3086 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java @@ -21,16 +21,18 @@ import io.rsocket.Frame; import org.reactivestreams.Publisher; import reactor.core.Disposable; +import reactor.core.Fuseable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.FutureMono; import java.util.Objects; +import java.util.Queue; /** An implementation of {@link DuplexConnection} that connects via TCP. */ public final class TcpDuplexConnection implements DuplexConnection { - + private final Connection connection; private final Disposable channelClosed; /** @@ -50,44 +52,57 @@ public TcpDuplexConnection(Connection connection) { }) .subscribe(); } - + @Override public void dispose() { connection.dispose(); } - + @Override public boolean isDisposed() { return connection.isDisposed(); } - + @Override public Mono onClose() { return connection - .onDispose() - .doFinally( - s -> { - if (!channelClosed.isDisposed()) { - channelClosed.dispose(); - } - }); + .onDispose() + .doFinally( + s -> { + if (!channelClosed.isDisposed()) { + channelClosed.dispose(); + } + }); } - + @Override public Flux receive() { return connection.inbound().receive().map(buf -> Frame.from(buf.retain())); } - + @Override public Mono send(Publisher frames) { return Flux.from(frames) - .transform( - frameFlux -> - new SendPublisher<>( - frameFlux, - connection.channel(), - frame -> frame.content().retain(), - ByteBuf::readableBytes)) - .then(); + .transform( + frameFlux -> { + if (frameFlux instanceof Fuseable.QueueSubscription) { + Fuseable.QueueSubscription queueSubscription = + (Fuseable.QueueSubscription) frameFlux; + queueSubscription.requestFusion(Fuseable.ASYNC); + return new SendPublisher<>( + queueSubscription, + frameFlux, + connection.channel(), + frame -> frame.content().retain(), + ByteBuf::readableBytes); + } else { + return new SendPublisher<>( + frameFlux, + connection.channel(), + frame -> frame.content().retain(), + ByteBuf::readableBytes); + } + }) + .then(); } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java index 4154dc630..f2d79e198 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java @@ -23,10 +23,12 @@ import io.rsocket.frame.FrameHeaderFlyweight; import org.reactivestreams.Publisher; import reactor.core.Disposable; +import reactor.core.Fuseable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.FutureMono; +import reactor.util.concurrent.Queues; import java.util.Objects; @@ -41,79 +43,93 @@ * back on for frames received. */ public final class WebsocketDuplexConnection implements DuplexConnection { - - private final Connection connection; - private final Disposable channelClosed; - - /** - * Creates a new instance - * - * @param connection the {@link Connection} to for managing the server - */ - public WebsocketDuplexConnection(Connection connection) { - this.connection = Objects.requireNonNull(connection, "connection must not be null"); - this.channelClosed = - FutureMono.from(connection.channel().closeFuture()) - .doFinally( - s -> { - if (!isDisposed()) { - dispose(); - } - }) - .subscribe(); - } - - @Override - public void dispose() { - connection.dispose(); - } - - @Override - public boolean isDisposed() { - return connection.isDisposed(); - } - - @Override - public Mono onClose() { - return connection - .onDispose() - .doFinally( - s -> { - if (!channelClosed.isDisposed()) { - channelClosed.dispose(); - } - }); - } - - @Override - public Flux receive() { - return connection - .inbound() - .receive() - .map( - buf -> { - CompositeByteBuf composite = connection.channel().alloc().compositeBuffer(); - ByteBuf length = wrappedBuffer(new byte[FRAME_LENGTH_SIZE]); - FrameHeaderFlyweight.encodeLength(length, 0, buf.readableBytes()); - composite.addComponents(true, length, buf.retain()); - return Frame.from(composite); - }); - } - - @Override - public Mono send(Publisher frames) { - return Flux.from(frames) - .transform( - frameFlux -> - new SendPublisher<>( - frameFlux, - connection.channel(), - this::toBinaryWebSocketFrame, - binaryWebSocketFrame -> binaryWebSocketFrame.content().readableBytes())) - .then(); - } - - private BinaryWebSocketFrame toBinaryWebSocketFrame(Frame frame) { - return new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE).retain()); - } + + private final Connection connection; + private final Disposable channelClosed; + + /** + * Creates a new instance + * + * @param connection the {@link Connection} to for managing the server + */ + public WebsocketDuplexConnection(Connection connection) { + this.connection = Objects.requireNonNull(connection, "connection must not be null"); + this.channelClosed = + FutureMono.from(connection.channel().closeFuture()) + .doFinally( + s -> { + if (!isDisposed()) { + dispose(); + } + }) + .subscribe(); + } + + @Override + public void dispose() { + connection.dispose(); + } + + @Override + public boolean isDisposed() { + return connection.isDisposed(); + } + + @Override + public Mono onClose() { + return connection + .onDispose() + .doFinally( + s -> { + if (!channelClosed.isDisposed()) { + channelClosed.dispose(); + } + }); + } + + @Override + public Flux receive() { + return connection + .inbound() + .receive() + .map( + buf -> { + CompositeByteBuf composite = connection.channel().alloc().compositeBuffer(); + ByteBuf length = wrappedBuffer(new byte[FRAME_LENGTH_SIZE]); + FrameHeaderFlyweight.encodeLength(length, 0, buf.readableBytes()); + composite.addComponents(true, length, buf.retain()); + return Frame.from(composite); + }); + } + + @Override + public Mono send(Publisher frames) { + return Flux.from(frames) + .transform( + frameFlux -> { + if (frameFlux instanceof Fuseable.QueueSubscription) { + Fuseable.QueueSubscription queueSubscription = + (Fuseable.QueueSubscription) frameFlux; + queueSubscription.requestFusion(Fuseable.ASYNC); + return new SendPublisher<>( + queueSubscription, + frameFlux, + connection.channel(), + this::toBinaryWebSocketFrame, + binaryWebSocketFrame -> binaryWebSocketFrame.content().readableBytes()); + } else { + return new SendPublisher<>( + Queues.small().get(), + frameFlux, + connection.channel(), + this::toBinaryWebSocketFrame, + binaryWebSocketFrame -> binaryWebSocketFrame.content().readableBytes()); + } + }) + .then(); + } + + private BinaryWebSocketFrame toBinaryWebSocketFrame(Frame frame) { + return new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE).retain()); + } } From 5390ded5082dd7f64124b52418939eef285698ba Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Sun, 25 Nov 2018 10:22:06 -0800 Subject: [PATCH 2/3] jctool ref Signed-off-by: Robert Roeser --- rsocket-core/build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/rsocket-core/build.gradle b/rsocket-core/build.gradle index 780222efd..d62452619 100644 --- a/rsocket-core/build.gradle +++ b/rsocket-core/build.gradle @@ -38,7 +38,6 @@ dependencies { testImplementation 'org.mockito:mockito-core' testRuntimeOnly 'ch.qos.logback:logback-classic' - testRuntimeOnly 'org.jctools:jctools-core' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' // TODO: Remove after JUnit5 migration From 1f6987932ba9c804de8629d21049a91f0060d5b4 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Mon, 26 Nov 2018 10:57:03 -0800 Subject: [PATCH 3/3] updating version number Signed-off-by: Robert Roeser --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 618b0e31a..9fedf8f79 100644 --- a/gradle.properties +++ b/gradle.properties @@ -12,4 +12,4 @@ # limitations under the License. # -version=0.11.14.BUILD-SNAPSHOT +version=0.11.13