diff --git a/build.gradle b/build.gradle
index e841a03f0..0b0e345c7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -34,7 +34,6 @@ subprojects {
ext['netty.version'] = '4.1.29.Final'
ext['netty-boringssl.version'] = '2.0.18.Final'
ext['hdrhistogram.version'] = '2.1.10'
- ext['jctool.version'] = '2.1.2'
ext['mockito.version'] = '2.23.0'
ext['slf4j.version'] = '1.7.25'
ext['jmh.version'] = '1.21'
@@ -56,7 +55,6 @@ subprojects {
dependency "io.micrometer:micrometer-core:${ext['micrometer.version']}"
dependency "org.assertj:assertj-core:${ext['assertj.version']}"
dependency "org.hdrhistogram:HdrHistogram:${ext['hdrhistogram.version']}"
- dependency "org.jctools:jctools-core:${ext['jctool.version']}"
dependency "org.mockito:mockito-core:${ ext['mockito.version']}"
dependency "org.slf4j:slf4j-api:${ext['slf4j.version']}"
diff --git a/gradle.properties b/gradle.properties
index 618b0e31a..9fedf8f79 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -12,4 +12,4 @@
# limitations under the License.
#
-version=0.11.14.BUILD-SNAPSHOT
+version=0.11.13
diff --git a/rsocket-core/build.gradle b/rsocket-core/build.gradle
index 780222efd..d62452619 100644
--- a/rsocket-core/build.gradle
+++ b/rsocket-core/build.gradle
@@ -38,7 +38,6 @@ dependencies {
testImplementation 'org.mockito:mockito-core'
testRuntimeOnly 'ch.qos.logback:logback-classic'
- testRuntimeOnly 'org.jctools:jctools-core'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
// TODO: Remove after JUnit5 migration
diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java
index 385fba7c5..7a4d54a4f 100644
--- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java
+++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java
@@ -22,35 +22,34 @@
import io.rsocket.framing.FrameType;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;
+import io.rsocket.internal.UnicastMonoProcessor;
+import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.SignalType;
-import reactor.core.publisher.UnicastProcessor;
+import reactor.core.publisher.*;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
/** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */
class RSocketClient implements RSocket {
-
+
private final DuplexConnection connection;
private final Function frameDecoder;
private final Consumer errorConsumer;
private final StreamIdSupplier streamIdSupplier;
private final Map senders;
- private final Map> receivers;
+ private final Map> receivers;
private final UnboundedProcessor sendProcessor;
- private KeepAliveHandler keepAliveHandler;
private final Lifecycle lifecycle = new Lifecycle();
-
+ private KeepAliveHandler keepAliveHandler;
+
/*server requester*/
RSocketClient(
DuplexConnection connection,
@@ -60,7 +59,7 @@ class RSocketClient implements RSocket {
this(
connection, frameDecoder, errorConsumer, streamIdSupplier, Duration.ZERO, Duration.ZERO, 0);
}
-
+
/*client requester*/
RSocketClient(
DuplexConnection connection,
@@ -74,26 +73,26 @@ class RSocketClient implements RSocket {
this.frameDecoder = frameDecoder;
this.errorConsumer = errorConsumer;
this.streamIdSupplier = streamIdSupplier;
- this.senders = Collections.synchronizedMap(new IntObjectHashMap<>());
+ this.senders = Collections.synchronizedMap(new IntObjectHashMap<>());
this.receivers = Collections.synchronizedMap(new IntObjectHashMap<>());
-
+
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
this.sendProcessor = new UnboundedProcessor<>();
-
+
connection.onClose().doFinally(signalType -> terminate()).subscribe(null, errorConsumer);
-
+
connection
.send(sendProcessor)
.doFinally(this::handleSendProcessorCancel)
.subscribe(null, this::handleSendProcessorError);
-
+
connection.receive().subscribe(this::handleIncomingFrames, errorConsumer);
-
+
if (!Duration.ZERO.equals(tickPeriod)) {
this.keepAliveHandler =
KeepAliveHandler.ofClient(
new KeepAliveHandler.KeepAlive(tickPeriod, ackTimeout, missedAcks));
-
+
keepAliveHandler
.timeout()
.subscribe(
@@ -101,7 +100,7 @@ class RSocketClient implements RSocket {
String message =
String.format("No keep-alive acks for %d ms", keepAlive.getTimeoutMillis());
ConnectionErrorException err = new ConnectionErrorException(message);
- lifecycle.terminate(err);
+ lifecycle.setTerminationError(err);
errorConsumer.accept(err);
connection.dispose();
});
@@ -110,9 +109,9 @@ class RSocketClient implements RSocket {
keepAliveHandler = null;
}
}
-
+
private void handleSendProcessorError(Throwable t) {
- Throwable terminationError = lifecycle.terminationError();
+ Throwable terminationError = lifecycle.getTerminationError();
Throwable err = terminationError != null ? terminationError : t;
for (Subscriber subscriber : receivers.values()) {
try {
@@ -121,17 +120,17 @@ private void handleSendProcessorError(Throwable t) {
errorConsumer.accept(e);
}
}
-
+
for (LimitableRequestPublisher p : senders.values()) {
p.cancel();
}
}
-
+
private void handleSendProcessorCancel(SignalType t) {
if (SignalType.ON_ERROR == t) {
return;
}
-
+
for (Subscriber subscriber : receivers.values()) {
try {
subscriber.onError(new Throwable("closed connection"));
@@ -139,261 +138,265 @@ private void handleSendProcessorCancel(SignalType t) {
errorConsumer.accept(e);
}
}
-
+
for (LimitableRequestPublisher p : senders.values()) {
p.cancel();
}
}
-
+
@Override
public Mono fireAndForget(Payload payload) {
return handleFireAndForget(payload);
}
-
+
@Override
public Mono requestResponse(Payload payload) {
return handleRequestResponse(payload);
}
-
+
@Override
public Flux requestStream(Payload payload) {
return handleRequestStream(payload);
}
-
+
@Override
public Flux requestChannel(Publisher payloads) {
return handleChannel(Flux.from(payloads));
}
-
+
@Override
public Mono metadataPush(Payload payload) {
return handleMetadataPush(payload);
}
-
+
@Override
public double availability() {
return connection.availability();
}
-
+
@Override
public void dispose() {
connection.dispose();
}
-
+
@Override
public boolean isDisposed() {
return connection.isDisposed();
}
-
+
@Override
public Mono onClose() {
return connection.onClose();
}
-
+
private Mono handleFireAndForget(Payload payload) {
return lifecycle
- .active()
- .then(
- Mono.fromRunnable(
- () -> {
- final int streamId = streamIdSupplier.nextStreamId();
- final Frame requestFrame =
- Frame.Request.from(streamId, FrameType.REQUEST_FNF, payload, 1);
- payload.release();
- sendProcessor.onNext(requestFrame);
- }));
+ .active()
+ .then(
+ Mono.fromRunnable(
+ () -> {
+ final int streamId = streamIdSupplier.nextStreamId();
+ final Frame requestFrame =
+ Frame.Request.from(streamId, FrameType.REQUEST_FNF, payload, 1);
+ payload.release();
+ sendProcessor.onNext(requestFrame);
+ }));
}
-
+
private Flux handleRequestStream(final Payload payload) {
return lifecycle
- .active()
- .thenMany(
- Flux.defer(
- () -> {
- int streamId = streamIdSupplier.nextStreamId();
-
- UnicastProcessor receiver = UnicastProcessor.create();
- receivers.put(streamId, receiver);
-
- AtomicBoolean first = new AtomicBoolean(false);
-
- return receiver
- .doOnRequest(
- n -> {
- if (first.compareAndSet(false, true) && !receiver.isDisposed()) {
- final Frame requestFrame =
- Frame.Request.from(
- streamId, FrameType.REQUEST_STREAM, payload, n);
- payload.release();
- sendProcessor.onNext(requestFrame);
- } else if (contains(streamId) && !receiver.isDisposed()) {
- sendProcessor.onNext(Frame.RequestN.from(streamId, n));
- }
- sendProcessor.drain();
- })
- .doOnError(
- t -> {
- if (contains(streamId) && !receiver.isDisposed()) {
- sendProcessor.onNext(Frame.Error.from(streamId, t));
- }
- })
- .doOnCancel(
- () -> {
- if (contains(streamId) && !receiver.isDisposed()) {
- sendProcessor.onNext(Frame.Cancel.from(streamId));
- }
- })
- .doFinally(
- s -> {
- receivers.remove(streamId);
- });
- }));
+ .active()
+ .thenMany(
+ Flux.defer(
+ () -> {
+ int streamId = streamIdSupplier.nextStreamId();
+
+ UnicastProcessor receiver = UnicastProcessor.create();
+ receivers.put(streamId, receiver);
+
+ AtomicBoolean first = new AtomicBoolean(false);
+
+ return receiver
+ .doOnRequest(
+ n -> {
+ if (first.compareAndSet(false, true) && !receiver.isDisposed()) {
+ final Frame requestFrame =
+ Frame.Request.from(
+ streamId, FrameType.REQUEST_STREAM, payload, n);
+ payload.release();
+ sendProcessor.onNext(requestFrame);
+ } else if (contains(streamId) && !receiver.isDisposed()) {
+ sendProcessor.onNext(Frame.RequestN.from(streamId, n));
+ }
+ sendProcessor.drain();
+ })
+ .doOnError(
+ t -> {
+ if (contains(streamId) && !receiver.isDisposed()) {
+ sendProcessor.onNext(Frame.Error.from(streamId, t));
+ }
+ })
+ .doOnCancel(
+ () -> {
+ if (contains(streamId) && !receiver.isDisposed()) {
+ sendProcessor.onNext(Frame.Cancel.from(streamId));
+ }
+ })
+ .doFinally(
+ s -> {
+ receivers.remove(streamId);
+ });
+ }));
}
-
+
private Mono handleRequestResponse(final Payload payload) {
return lifecycle
- .active()
- .then(
- Mono.defer(
- () -> {
- int streamId = streamIdSupplier.nextStreamId();
- final Frame requestFrame =
- Frame.Request.from(streamId, FrameType.REQUEST_RESPONSE, payload, 1);
- payload.release();
-
- UnicastProcessor receiver = UnicastProcessor.create();
- receivers.put(streamId, receiver);
-
- sendProcessor.onNext(requestFrame);
-
- return receiver
- .singleOrEmpty()
- .doOnError(t -> sendProcessor.onNext(Frame.Error.from(streamId, t)))
- .doOnCancel(() -> sendProcessor.onNext(Frame.Cancel.from(streamId)))
- .doFinally(
- s -> {
- receivers.remove(streamId);
- });
- }));
+ .active()
+ .then(
+ Mono.defer(
+ () -> {
+ int streamId = streamIdSupplier.nextStreamId();
+ final Frame requestFrame =
+ Frame.Request.from(streamId, FrameType.REQUEST_RESPONSE, payload, 1);
+ payload.release();
+
+ UnicastMonoProcessor receiver = UnicastMonoProcessor.create();
+ receivers.put(streamId, receiver);
+
+ sendProcessor.onNext(requestFrame);
+
+ return receiver
+ .doOnError(t -> sendProcessor.onNext(Frame.Error.from(streamId, t)))
+ .doFinally(
+ s -> {
+ if (s == SignalType.CANCEL) {
+ sendProcessor.onNext(Frame.Cancel.from(streamId));
+ }
+
+ receivers.remove(streamId);
+ });
+ }));
}
-
+
private Flux handleChannel(Flux request) {
return lifecycle
- .active()
- .thenMany(
- Flux.defer(
- () -> {
- final UnicastProcessor receiver = UnicastProcessor.create();
- final int streamId = streamIdSupplier.nextStreamId();
- final AtomicBoolean firstRequest = new AtomicBoolean(true);
-
- return receiver
- .doOnRequest(
- n -> {
- if (firstRequest.compareAndSet(true, false)) {
- final AtomicBoolean firstPayload = new AtomicBoolean(true);
- final Flux requestFrames =
- request
- .transform(
- f -> {
- LimitableRequestPublisher wrapped =
- LimitableRequestPublisher.wrap(f);
- // Need to set this to one for first the frame
- wrapped.increaseRequestLimit(1);
- senders.put(streamId, wrapped);
- receivers.put(streamId, receiver);
-
- return wrapped;
- })
- .map(
- payload -> {
- final Frame requestFrame;
- if (firstPayload.compareAndSet(true, false)) {
- requestFrame =
- Frame.Request.from(
- streamId,
- FrameType.REQUEST_CHANNEL,
- payload,
- n);
- } else {
- requestFrame =
- Frame.PayloadFrame.from(
- streamId, FrameType.NEXT, payload);
- }
- payload.release();
- return requestFrame;
- })
- .doOnComplete(
- () -> {
+ .active()
+ .thenMany(
+ Flux.defer(
+ () -> {
+ final UnicastProcessor receiver = UnicastProcessor.create();
+ final int streamId = streamIdSupplier.nextStreamId();
+ final AtomicBoolean firstRequest = new AtomicBoolean(true);
+
+ return receiver
+ .doOnRequest(
+ n -> {
+ if (firstRequest.compareAndSet(true, false)) {
+ final AtomicBoolean firstPayload = new AtomicBoolean(true);
+ final Flux requestFrames =
+ request
+ .transform(
+ f -> {
+ LimitableRequestPublisher wrapped =
+ LimitableRequestPublisher.wrap(f);
+ // Need to set this to one for first the frame
+ wrapped.increaseRequestLimit(1);
+ senders.put(streamId, wrapped);
+ receivers.put(streamId, receiver);
+
+ return wrapped;
+ })
+ .map(
+ payload -> {
+ final Frame requestFrame;
+ if (firstPayload.compareAndSet(true, false)) {
+ requestFrame =
+ Frame.Request.from(
+ streamId,
+ FrameType.REQUEST_CHANNEL,
+ payload,
+ n);
+ } else {
+ requestFrame =
+ Frame.PayloadFrame.from(
+ streamId, FrameType.NEXT, payload);
+ }
+ payload.release();
+ return requestFrame;
+ })
+ .doOnComplete(
+ () -> {
+ if (contains(streamId) && !receiver.isDisposed()) {
+ sendProcessor.onNext(
+ Frame.PayloadFrame.from(
+ streamId, FrameType.COMPLETE));
+ }
+ if (firstPayload.get()) {
+ receiver.onComplete();
+ }
+ });
+
+ requestFrames.subscribe(
+ sendProcessor::onNext,
+ t -> {
+ errorConsumer.accept(t);
+ receiver.dispose();
+ });
+ } else {
if (contains(streamId) && !receiver.isDisposed()) {
- sendProcessor.onNext(
- Frame.PayloadFrame.from(
- streamId, FrameType.COMPLETE));
+ sendProcessor.onNext(Frame.RequestN.from(streamId, n));
}
- if (firstPayload.get()) {
- receiver.onComplete();
- }
- });
-
- requestFrames.subscribe(
- sendProcessor::onNext,
- t -> {
- errorConsumer.accept(t);
- receiver.dispose();
- });
- } else {
- if (contains(streamId) && !receiver.isDisposed()) {
- sendProcessor.onNext(Frame.RequestN.from(streamId, n));
- }
- }
- })
- .doOnError(
- t -> {
- if (contains(streamId) && !receiver.isDisposed()) {
- sendProcessor.onNext(Frame.Error.from(streamId, t));
- }
- })
- .doOnCancel(
- () -> {
- if (contains(streamId) && !receiver.isDisposed()) {
- sendProcessor.onNext(Frame.Cancel.from(streamId));
- }
- })
- .doFinally(
- s -> {
- receivers.remove(streamId);
- LimitableRequestPublisher sender = senders.remove(streamId);
- if (sender != null) {
- sender.cancel();
- }
- });
- }));
+ }
+ })
+ .doOnError(
+ t -> {
+ if (contains(streamId) && !receiver.isDisposed()) {
+ sendProcessor.onNext(Frame.Error.from(streamId, t));
+ }
+ })
+ .doOnCancel(
+ () -> {
+ if (contains(streamId) && !receiver.isDisposed()) {
+ sendProcessor.onNext(Frame.Cancel.from(streamId));
+ }
+ })
+ .doFinally(
+ s -> {
+ receivers.remove(streamId);
+ LimitableRequestPublisher sender = senders.remove(streamId);
+ if (sender != null) {
+ sender.cancel();
+ }
+ });
+ }));
}
-
+
private Mono handleMetadataPush(Payload payload) {
return lifecycle
- .active()
- .then(Mono.fromRunnable(
- () -> {
- final Frame requestFrame = Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 1);
- payload.release();
- sendProcessor.onNext(requestFrame);
- }));
+ .active()
+ .then(
+ Mono.fromRunnable(
+ () -> {
+ final Frame requestFrame =
+ Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 1);
+ payload.release();
+ sendProcessor.onNext(requestFrame);
+ }));
}
-
+
private boolean contains(int streamId) {
return receivers.containsKey(streamId);
}
-
+
protected void terminate() {
-
- lifecycle.terminate(new ClosedChannelException());
-
+
+ lifecycle.setTerminationError(new ClosedChannelException());
+
if (keepAliveHandler != null) {
keepAliveHandler.dispose();
}
try {
- for (UnicastProcessor subscriber : receivers.values()) {
+ for (Processor subscriber : receivers.values()) {
cleanUpSubscriber(subscriber);
}
for (LimitableRequestPublisher p : senders.values()) {
@@ -405,7 +408,7 @@ protected void terminate() {
sendProcessor.dispose();
}
}
-
+
private synchronized void cleanUpLimitableRequestPublisher(
LimitableRequestPublisher> limitableRequestPublisher) {
try {
@@ -414,15 +417,15 @@ private synchronized void cleanUpLimitableRequestPublisher(
errorConsumer.accept(t);
}
}
-
- private synchronized void cleanUpSubscriber(UnicastProcessor> subscriber) {
+
+ private synchronized void cleanUpSubscriber(Processor subscriber) {
try {
- subscriber.onError(lifecycle.terminationError());
+ subscriber.onError(lifecycle.getTerminationError());
} catch (Throwable t) {
errorConsumer.accept(t);
}
}
-
+
private void handleIncomingFrames(Frame frame) {
try {
int streamId = frame.getStreamId();
@@ -436,12 +439,12 @@ private void handleIncomingFrames(Frame frame) {
frame.release();
}
}
-
+
private void handleStreamZero(FrameType type, Frame frame) {
switch (type) {
case ERROR:
RuntimeException error = Exceptions.from(frame);
- lifecycle.terminate(error);
+ lifecycle.setTerminationError(error);
errorConsumer.accept(error);
connection.dispose();
break;
@@ -459,7 +462,7 @@ private void handleStreamZero(FrameType type, Frame frame) {
"Client received supported frame on stream 0: " + frame.toString()));
}
}
-
+
private void handleFrame(int streamId, FrameType type, Frame frame) {
Subscriber receiver = receivers.get(streamId);
if (receiver == null) {
@@ -475,27 +478,27 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
receiver.onComplete();
break;
case CANCEL:
- {
- LimitableRequestPublisher sender = senders.remove(streamId);
- receivers.remove(streamId);
- if (sender != null) {
- sender.cancel();
- }
- break;
+ {
+ LimitableRequestPublisher sender = senders.remove(streamId);
+ receivers.remove(streamId);
+ if (sender != null) {
+ sender.cancel();
}
+ break;
+ }
case NEXT:
receiver.onNext(frameDecoder.apply(frame));
break;
case REQUEST_N:
- {
- LimitableRequestPublisher sender = senders.get(streamId);
- if (sender != null) {
- int n = Frame.RequestN.requestN(frame);
- sender.increaseRequestLimit(n);
- sendProcessor.drain();
- }
- break;
+ {
+ LimitableRequestPublisher sender = senders.get(streamId);
+ if (sender != null) {
+ int n = Frame.RequestN.requestN(frame);
+ sender.increaseRequestLimit(n);
+ sendProcessor.drain();
}
+ break;
+ }
case COMPLETE:
receiver.onComplete();
receivers.remove(streamId);
@@ -506,14 +509,14 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
}
}
}
-
+
private void handleMissingResponseProcessor(int streamId, FrameType type, Frame frame) {
if (!streamIdSupplier.isBeforeOrCurrent(streamId)) {
if (type == FrameType.ERROR) {
// message for stream that has never existed, we have a problem with
// the overall connection and must tear down
String errorMessage = frame.getDataUtf8();
-
+
throw new IllegalStateException(
"Client received error for non-existent stream: "
+ streamId
@@ -530,29 +533,31 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, Frame
// receiving a frame after a given stream has been cancelled/completed,
// so ignore (cancellation is async so there is a race condition)
}
-
+
private static class Lifecycle {
-
- private final AtomicReference terminationError = new AtomicReference<>();
-
+
+ private static final AtomicReferenceFieldUpdater TERMINATION_ERROR =
+ AtomicReferenceFieldUpdater.newUpdater(
+ Lifecycle.class, Throwable.class, "terminationError");
+ private volatile Throwable terminationError;
+
public Mono active() {
return Mono.create(
sink -> {
- Throwable err = terminationError();
- if (err == null) {
+ if (terminationError == null) {
sink.success();
} else {
- sink.error(err);
+ sink.error(terminationError);
}
});
}
-
- public void terminate(Throwable err) {
- this.terminationError.compareAndSet(null, err);
+
+ public void setTerminationError(Throwable err) {
+ TERMINATION_ERROR.compareAndSet(this, null, err);
}
-
- public Throwable terminationError() {
- return terminationError.get();
+
+ public Throwable getTerminationError() {
+ return terminationError;
}
}
}
diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java
index bc0bfcf17..3b47d59e3 100644
--- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java
+++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java
@@ -22,14 +22,12 @@
import io.rsocket.framing.FrameType;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;
+import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.SignalType;
-import reactor.core.publisher.UnicastProcessor;
+import reactor.core.publisher.*;
import java.util.Collections;
import java.util.Map;
@@ -42,18 +40,18 @@
/** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */
class RSocketServer implements RSocket {
-
+
private final DuplexConnection connection;
private final RSocket requestHandler;
private final Function frameDecoder;
private final Consumer errorConsumer;
-
+
private final Map sendingSubscriptions;
- private final Map> channelProcessors;
-
+ private final Map> channelProcessors;
+
private final UnboundedProcessor sendProcessor;
private KeepAliveHandler keepAliveHandler;
-
+
/*client responder*/
RSocketServer(
DuplexConnection connection,
@@ -62,7 +60,7 @@ class RSocketServer implements RSocket {
Consumer errorConsumer) {
this(connection, requestHandler, frameDecoder, errorConsumer, 0, 0);
}
-
+
/*server responder*/
RSocketServer(
DuplexConnection connection,
@@ -77,18 +75,18 @@ class RSocketServer implements RSocket {
this.errorConsumer = errorConsumer;
this.sendingSubscriptions = Collections.synchronizedMap(new IntObjectHashMap<>());
this.channelProcessors = Collections.synchronizedMap(new IntObjectHashMap<>());
-
+
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
// connections
this.sendProcessor = new UnboundedProcessor<>();
-
+
connection
.send(sendProcessor)
.doFinally(this::handleSendProcessorCancel)
.subscribe(null, this::handleSendProcessorError);
-
+
Disposable receiveDisposable = connection.receive().subscribe(this::handleFrame, errorConsumer);
-
+
this.connection
.onClose()
.doFinally(
@@ -97,11 +95,11 @@ class RSocketServer implements RSocket {
receiveDisposable.dispose();
})
.subscribe(null, errorConsumer);
-
+
if (tickPeriod != 0) {
keepAliveHandler =
KeepAliveHandler.ofServer(new KeepAliveHandler.KeepAlive(tickPeriod, ackTimeout));
-
+
keepAliveHandler
.timeout()
.subscribe(
@@ -116,7 +114,7 @@ class RSocketServer implements RSocket {
keepAliveHandler = null;
}
}
-
+
private void handleSendProcessorError(Throwable t) {
for (Subscription subscription : sendingSubscriptions.values()) {
try {
@@ -125,21 +123,21 @@ private void handleSendProcessorError(Throwable t) {
errorConsumer.accept(e);
}
}
-
- for (UnicastProcessor subscription : channelProcessors.values()) {
+
+ for (Processor subscription : channelProcessors.values()) {
try {
- subscription.cancel();
+ subscription.onError(t);
} catch (Throwable e) {
errorConsumer.accept(e);
}
}
}
-
+
private void handleSendProcessorCancel(SignalType t) {
if (SignalType.ON_ERROR == t) {
return;
}
-
+
for (Subscription subscription : sendingSubscriptions.values()) {
try {
subscription.cancel();
@@ -147,16 +145,16 @@ private void handleSendProcessorCancel(SignalType t) {
errorConsumer.accept(e);
}
}
-
- for (UnicastProcessor subscription : channelProcessors.values()) {
+
+ for (Processor subscription : channelProcessors.values()) {
try {
- subscription.cancel();
+ subscription.onComplete();
} catch (Throwable e) {
errorConsumer.accept(e);
}
}
}
-
+
@Override
public Mono fireAndForget(Payload payload) {
try {
@@ -165,7 +163,7 @@ public Mono fireAndForget(Payload payload) {
return Mono.error(t);
}
}
-
+
@Override
public Mono requestResponse(Payload payload) {
try {
@@ -174,7 +172,7 @@ public Mono requestResponse(Payload payload) {
return Mono.error(t);
}
}
-
+
@Override
public Flux requestStream(Payload payload) {
try {
@@ -183,7 +181,7 @@ public Flux requestStream(Payload payload) {
return Flux.error(t);
}
}
-
+
@Override
public Flux requestChannel(Publisher payloads) {
try {
@@ -192,7 +190,7 @@ public Flux requestChannel(Publisher payloads) {
return Flux.error(t);
}
}
-
+
@Override
public Mono metadataPush(Payload payload) {
try {
@@ -201,43 +199,45 @@ public Mono metadataPush(Payload payload) {
return Mono.error(t);
}
}
-
+
@Override
public void dispose() {
connection.dispose();
}
-
+
@Override
public boolean isDisposed() {
return connection.isDisposed();
}
-
+
@Override
public Mono onClose() {
return connection.onClose();
}
-
+
private void cleanup() {
if (keepAliveHandler != null) {
keepAliveHandler.dispose();
}
cleanUpSendingSubscriptions();
cleanUpChannelProcessors();
-
+
requestHandler.dispose();
sendProcessor.dispose();
}
-
+
private synchronized void cleanUpSendingSubscriptions() {
sendingSubscriptions.values().forEach(Subscription::cancel);
sendingSubscriptions.clear();
}
-
+
private synchronized void cleanUpChannelProcessors() {
- channelProcessors.values().forEach(Subscription::cancel);
+ channelProcessors
+ .values()
+ .forEach(Processor::onComplete);
channelProcessors.clear();
}
-
+
private void handleFrame(Frame frame) {
try {
int streamId = frame.getStreamId();
@@ -313,14 +313,14 @@ private void handleFrame(Frame frame) {
frame.release();
}
}
-
+
private void handleFireAndForget(int streamId, Mono result) {
result
.doOnSubscribe(subscription -> sendingSubscriptions.put(streamId, subscription))
.doFinally(signalType -> sendingSubscriptions.remove(streamId))
.subscribe(null, errorConsumer);
}
-
+
private void handleRequestResponse(int streamId, Mono response) {
response
.doOnSubscribe(subscription -> sendingSubscriptions.put(streamId, subscription))
@@ -340,7 +340,7 @@ private void handleRequestResponse(int streamId, Mono response) {
.doFinally(signalType -> sendingSubscriptions.remove(streamId))
.subscribe(sendProcessor::onNext, t -> handleError(streamId, t));
}
-
+
private void handleStream(int streamId, Flux response, int initialRequestN) {
response
.transform(
@@ -364,44 +364,44 @@ private void handleStream(int streamId, Flux response, int initialReque
sendProcessor.onNext(frame);
});
}
-
+
private void handleChannel(int streamId, Payload payload, int initialRequestN) {
UnicastProcessor frames = UnicastProcessor.create();
channelProcessors.put(streamId, frames);
-
+
Flux payloads =
frames
.doOnCancel(() -> sendProcessor.onNext(Frame.Cancel.from(streamId)))
.doOnError(t -> sendProcessor.onNext(Frame.Error.from(streamId, t)))
.doOnRequest(l -> sendProcessor.onNext(Frame.RequestN.from(streamId, l)))
.doFinally(signalType -> channelProcessors.remove(streamId));
-
+
// not chained, as the payload should be enqueued in the Unicast processor before this method
// returns
// and any later payload can be processed
frames.onNext(payload);
-
+
handleStream(streamId, requestChannel(payloads), initialRequestN);
}
-
+
private void handleKeepAliveFrame(Frame frame) {
if (keepAliveHandler != null) {
keepAliveHandler.receive(frame);
}
}
-
+
private void handleCancelFrame(int streamId) {
Subscription subscription = sendingSubscriptions.remove(streamId);
if (subscription != null) {
subscription.cancel();
}
}
-
+
private void handleError(int streamId, Throwable t) {
errorConsumer.accept(t);
sendProcessor.onNext(Frame.Error.from(streamId, t));
}
-
+
private void handleRequestN(int streamId, Frame frame) {
final Subscription subscription = sendingSubscriptions.get(streamId);
if (subscription != null) {
diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java
index 0ac558ce5..fd459fe03 100644
--- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java
+++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java
@@ -19,7 +19,6 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.collection.IntObjectHashMap;
-import io.netty.util.collection.LongObjectHashMap;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import io.rsocket.util.AbstractionLeakingFrameUtils;
@@ -90,12 +89,10 @@ public FragmentationDuplexConnection(
.doFinally(
signalType -> {
Collection values;
- synchronized (this) {
+ synchronized (FragmentationDuplexConnection.this) {
values = frameReassemblers.values();
}
- for (FrameReassembler reassembler : values) {
- reassembler.dispose();
- }
+ values.forEach(FrameReassembler::dispose);
})
.subscribe();
}
@@ -146,11 +143,10 @@ private Flux toFragmentedFrames(int streamId, io.rsocket.framing.Frame fr
private Mono toReassembledFrames(int streamId, io.rsocket.framing.Frame fragment) {
FrameReassembler frameReassembler;
-
synchronized (this) {
frameReassembler =
frameReassemblers.computeIfAbsent(
- streamId, i -> createFrameReassembler(byteBufAllocator));
+ streamId, i -> createFrameReassembler(byteBufAllocator));
}
return Mono.justOrEmpty(frameReassembler.reassemble(fragment))
diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java
index 341de4608..bcfa77287 100644
--- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java
+++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java
@@ -17,6 +17,7 @@
package io.rsocket.internal;
import io.netty.util.ReferenceCountUtil;
+import io.netty.util.internal.shaded.org.jctools.queues.MpscUnboundedArrayQueue;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
@@ -43,32 +44,24 @@
public final class UnboundedProcessor extends FluxProcessor
implements Fuseable.QueueSubscription, Fuseable {
- final Queue queue;
-
- volatile boolean done;
- Throwable error;
-
- volatile CoreSubscriber super T> actual;
-
- volatile boolean cancelled;
-
- volatile int once;
-
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater ONCE =
AtomicIntegerFieldUpdater.newUpdater(UnboundedProcessor.class, "once");
-
- volatile int wip;
-
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater WIP =
AtomicIntegerFieldUpdater.newUpdater(UnboundedProcessor.class, "wip");
-
- volatile long requested;
-
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater REQUESTED =
AtomicLongFieldUpdater.newUpdater(UnboundedProcessor.class, "requested");
+ final Queue queue;
+ volatile boolean done;
+ Throwable error;
+ volatile CoreSubscriber super T> actual;
+ volatile boolean cancelled;
+ volatile int once;
+ volatile int wip;
+ volatile long requested;
+ volatile boolean outputFused;
public UnboundedProcessor() {
this.queue = Queues.unboundedMultiproducer().get();
@@ -130,20 +123,60 @@ void drainRegular(Subscriber super T> a) {
}
}
}
-
+
+ void drainFused(Subscriber super T> a) {
+ int missed = 1;
+
+ final Queue q = queue;
+
+ for (;;) {
+
+ if (cancelled) {
+ q.clear();
+ actual = null;
+ return;
+ }
+
+ boolean d = done;
+
+ a.onNext(null);
+
+ if (d) {
+ actual = null;
+
+ Throwable ex = error;
+ if (ex != null) {
+ a.onError(ex);
+ } else {
+ a.onComplete();
+ }
+ return;
+ }
+
+ missed = WIP.addAndGet(this, -missed);
+ if (missed == 0) {
+ break;
+ }
+ }
+ }
+
+
public void drain() {
if (WIP.getAndIncrement(this) != 0) {
return;
}
int missed = 1;
-
+
for (; ; ) {
Subscriber super T> a = actual;
if (a != null) {
-
- drainRegular(a);
-
+
+ if (outputFused) {
+ drainFused(a);
+ } else {
+ drainRegular(a);
+ }
return;
}
@@ -281,6 +314,11 @@ public void cancel() {
}
}
+ @Override
+ public T peek() {
+ return queue.peek();
+ }
+
@Override
@Nullable
public T poll() {
@@ -306,12 +344,16 @@ public void clear() {
}
}
}
-
+
@Override
public int requestFusion(int requestedMode) {
+ if ((requestedMode & Fuseable.ASYNC) != 0) {
+ outputFused = true;
+ return Fuseable.ASYNC;
+ }
return Fuseable.NONE;
}
-
+
@Override
public void dispose() {
cancel();
diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java
new file mode 100644
index 000000000..47849c4c0
--- /dev/null
+++ b/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java
@@ -0,0 +1,191 @@
+package io.rsocket.internal;
+
+import org.reactivestreams.Processor;
+import org.reactivestreams.Subscription;
+import reactor.core.CoreSubscriber;
+import reactor.core.Disposable;
+import reactor.core.Scannable;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+import reactor.core.publisher.Operators;
+import reactor.util.annotation.Nullable;
+import reactor.util.context.Context;
+import reactor.util.function.Tuple2;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.LongSupplier;
+import java.util.stream.Stream;
+
+public class UnicastMonoProcessor extends Mono
+ implements Processor,
+ CoreSubscriber,
+ Disposable,
+ Subscription,
+ Scannable,
+ LongSupplier {
+
+ @SuppressWarnings("rawtypes")
+ static final AtomicIntegerFieldUpdater ONCE =
+ AtomicIntegerFieldUpdater.newUpdater(UnicastMonoProcessor.class, "once");
+
+ private final MonoProcessor processor;
+
+ @SuppressWarnings("unused")
+ private volatile int once;
+
+ private UnicastMonoProcessor() {
+ this.processor = MonoProcessor.create();
+ }
+
+ public static UnicastMonoProcessor create() {
+ return new UnicastMonoProcessor<>();
+ }
+
+ @Override
+ public Stream extends Scannable> actuals() {
+ return processor.actuals();
+ }
+
+ @Override
+ public boolean isScanAvailable() {
+ return processor.isScanAvailable();
+ }
+
+ @Override
+ public String name() {
+ return processor.name();
+ }
+
+ @Override
+ public String stepName() {
+ return processor.stepName();
+ }
+
+ @Override
+ public Stream steps() {
+ return processor.steps();
+ }
+
+ @Override
+ public Stream extends Scannable> parents() {
+ return processor.parents();
+ }
+
+ @Override
+ @Nullable
+ public T scan(Attr key) {
+ return processor.scan(key);
+ }
+
+ @Override
+ public T scanOrDefault(Attr key, T defaultValue) {
+ return processor.scanOrDefault(key, defaultValue);
+ }
+
+ @Override
+ public Stream> tags() {
+ return processor.tags();
+ }
+
+ @Override
+ public long getAsLong() {
+ return processor.getAsLong();
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ processor.onSubscribe(s);
+ }
+
+ @Override
+ public void onNext(O o) {
+ processor.onNext(o);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ processor.onError(t);
+ }
+
+ @Nullable
+ public Throwable getError() {
+ return processor.getError();
+ }
+
+ public boolean isCancelled() {
+ return processor.isCancelled();
+ }
+
+ public boolean isError() {
+ return processor.isError();
+ }
+
+ public boolean isSuccess() {
+ return processor.isSuccess();
+ }
+
+ public boolean isTerminated() {
+ return processor.isTerminated();
+ }
+
+ @Nullable
+ public O peek() {
+ return processor.peek();
+ }
+
+ public long downstreamCount() {
+ return processor.downstreamCount();
+ }
+
+ public boolean hasDownstreams() {
+ return processor.hasDownstreams();
+ }
+
+ @Override
+ public void onComplete() {
+ processor.onComplete();
+ }
+
+ @Override
+ public void request(long n) {
+ processor.request(n);
+ }
+
+ @Override
+ public void cancel() {
+ processor.cancel();
+ }
+
+ @Override
+ public void dispose() {
+ processor.dispose();
+ }
+
+ @Override
+ public Context currentContext() {
+ return processor.currentContext();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return processor.isDisposed();
+ }
+
+ @Override
+ public Object scanUnsafe(Attr key) {
+ return processor.scanUnsafe(key);
+ }
+
+ @Override
+ public void subscribe(CoreSubscriber super O> actual) {
+ Objects.requireNonNull(actual, "subscribe");
+ if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
+ processor.subscribe(actual);
+ } else {
+ Operators.error(
+ actual,
+ new IllegalStateException("UnicastMonoProcessor allows only a single Subscriber"));
+ }
+ }
+}
diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java
index 29c5f7ced..c8d57de30 100644
--- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java
+++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java
@@ -10,6 +10,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
+import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;
@@ -27,20 +28,23 @@ class SendPublisher extends Flux {
private static final int MAX_SIZE = Queues.SMALL_BUFFER_SIZE;
private static final int REFILL_SIZE = MAX_SIZE / 2;
- private static final AtomicReferenceFieldUpdater
- INNER_SUBSCRIBER =
- AtomicReferenceFieldUpdater.newUpdater(
- SendPublisher.class, Object.class, "innerSubscriber");
+ private static final AtomicReferenceFieldUpdater INNER_SUBSCRIBER =
+ AtomicReferenceFieldUpdater.newUpdater(SendPublisher.class, Object.class, "innerSubscriber");
+ private static final AtomicIntegerFieldUpdater TERMINATED =
+ AtomicIntegerFieldUpdater.newUpdater(SendPublisher.class, "terminated");
private final Publisher source;
private final Channel channel;
private final EventLoop eventLoop;
- private final Queue queue;
- private final AtomicBoolean terminated = new AtomicBoolean();
+ private final Queue queue;
private final AtomicBoolean completed = new AtomicBoolean();
private final Function transformer;
private final SizeOf sizeOf;
-
+
+ @SuppressWarnings("unused")
+ private volatile int terminated;
+
private int pending;
+
@SuppressWarnings("unused")
private volatile int wip;
@@ -51,18 +55,31 @@ class SendPublisher extends Flux {
private long requestedUpstream = MAX_SIZE;
+ private boolean fuse;
+
@SuppressWarnings("unchecked")
SendPublisher(
Publisher source, Channel channel, Function transformer, SizeOf sizeOf) {
+ this(Queues.small().get(), source, channel, transformer, sizeOf);
+ }
+
+ @SuppressWarnings("unchecked")
+ SendPublisher(
+ Queue queue,
+ Publisher source,
+ Channel channel,
+ Function transformer,
+ SizeOf sizeOf) {
this.source = source;
this.channel = channel;
- this.queue = Queues.small().get();
+ this.queue = queue;
this.eventLoop = channel.eventLoop();
this.transformer = transformer;
this.sizeOf = sizeOf;
+
+ fuse = queue instanceof Fuseable.QueueSubscription;
}
- @SuppressWarnings("unchecked")
private ChannelPromise writeCleanupPromise(V poll) {
return channel
.newPromise()
@@ -90,9 +107,9 @@ private void tryComplete(InnerSubscriber is) {
if (pending == 0
&& completed.get()
&& queue.isEmpty()
- && !terminated.get()
+ && terminated == 0
&& !is.pendingFlush.get()) {
- terminated.set(true);
+ TERMINATED.set(SendPublisher.this, 1);
is.destination.onComplete();
}
}
@@ -101,12 +118,13 @@ private void tryComplete(InnerSubscriber is) {
public void subscribe(CoreSubscriber super Frame> destination) {
InnerSubscriber innerSubscriber = new InnerSubscriber(destination);
if (!INNER_SUBSCRIBER.compareAndSet(this, null, innerSubscriber)) {
- throw new IllegalStateException("SendPublisher only allows one subscription");
+ Operators.error(
+ destination, new IllegalStateException("SendPublisher only allows one subscription"));
+ } else {
+ InnerSubscription innerSubscription = new InnerSubscription(innerSubscriber);
+ destination.onSubscribe(innerSubscription);
+ source.subscribe(innerSubscriber);
}
-
- InnerSubscription innerSubscription = new InnerSubscription(innerSubscriber);
- destination.onSubscribe(innerSubscription);
- source.subscribe(innerSubscriber);
}
@FunctionalInterface
@@ -132,8 +150,8 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(Frame t) {
- if (!terminated.get()) {
- if (!queue.offer(transformer.apply(t))) {
+ if (terminated == 0) {
+ if (!fuse && !queue.offer(t)) {
throw new IllegalStateException("missing back pressure");
}
tryDrain();
@@ -142,7 +160,7 @@ public void onNext(Frame t) {
@Override
public void onError(Throwable t) {
- if (terminated.compareAndSet(false, true)) {
+ if (TERMINATED.compareAndSet(SendPublisher.this, 0, 1)) {
try {
s.cancel();
destination.onError(t);
@@ -180,7 +198,7 @@ private void flush() {
}
private void tryDrain() {
- if (wip == 0 && !terminated.get() && WIP.getAndIncrement(SendPublisher.this) == 0) {
+ if (wip == 0 && terminated == 0 && WIP.getAndIncrement(SendPublisher.this) == 0) {
try {
if (eventLoop.inEventLoop()) {
drain();
@@ -202,8 +220,9 @@ private void drain() {
long r = Math.min(requested, requestedUpstream);
while (r-- > 0) {
- V poll = queue.poll();
- if (poll != null && !terminated.get()) {
+ Frame frame = queue.poll();
+ if (frame != null && terminated == 0) {
+ V poll = transformer.apply(frame);
int readableBytes = sizeOf.size(poll);
pending++;
if (channel.isWritable() && readableBytes <= channel.bytesBeforeUnwritable()) {
@@ -225,7 +244,7 @@ private void drain() {
eventLoop.execute(this::flush);
}
- if (terminated.get()) {
+ if (terminated == 1) {
break;
}
@@ -259,7 +278,13 @@ public void request(long n) {
@Override
public void cancel() {
- terminated.set(true);
+ TERMINATED.set(SendPublisher.this, 1);
+ while (!queue.isEmpty()) {
+ Frame poll = queue.poll();
+ if (poll != null) {
+ ReferenceCountUtil.safeRelease(poll);
+ }
+ }
}
}
}
diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java
index 09fd07325..4f6fa3086 100644
--- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java
+++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java
@@ -21,16 +21,18 @@
import io.rsocket.Frame;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
+import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.FutureMono;
import java.util.Objects;
+import java.util.Queue;
/** An implementation of {@link DuplexConnection} that connects via TCP. */
public final class TcpDuplexConnection implements DuplexConnection {
-
+
private final Connection connection;
private final Disposable channelClosed;
/**
@@ -50,44 +52,57 @@ public TcpDuplexConnection(Connection connection) {
})
.subscribe();
}
-
+
@Override
public void dispose() {
connection.dispose();
}
-
+
@Override
public boolean isDisposed() {
return connection.isDisposed();
}
-
+
@Override
public Mono onClose() {
return connection
- .onDispose()
- .doFinally(
- s -> {
- if (!channelClosed.isDisposed()) {
- channelClosed.dispose();
- }
- });
+ .onDispose()
+ .doFinally(
+ s -> {
+ if (!channelClosed.isDisposed()) {
+ channelClosed.dispose();
+ }
+ });
}
-
+
@Override
public Flux receive() {
return connection.inbound().receive().map(buf -> Frame.from(buf.retain()));
}
-
+
@Override
public Mono send(Publisher frames) {
return Flux.from(frames)
- .transform(
- frameFlux ->
- new SendPublisher<>(
- frameFlux,
- connection.channel(),
- frame -> frame.content().retain(),
- ByteBuf::readableBytes))
- .then();
+ .transform(
+ frameFlux -> {
+ if (frameFlux instanceof Fuseable.QueueSubscription) {
+ Fuseable.QueueSubscription queueSubscription =
+ (Fuseable.QueueSubscription) frameFlux;
+ queueSubscription.requestFusion(Fuseable.ASYNC);
+ return new SendPublisher<>(
+ queueSubscription,
+ frameFlux,
+ connection.channel(),
+ frame -> frame.content().retain(),
+ ByteBuf::readableBytes);
+ } else {
+ return new SendPublisher<>(
+ frameFlux,
+ connection.channel(),
+ frame -> frame.content().retain(),
+ ByteBuf::readableBytes);
+ }
+ })
+ .then();
}
}
diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java
index 4154dc630..f2d79e198 100644
--- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java
+++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java
@@ -23,10 +23,12 @@
import io.rsocket.frame.FrameHeaderFlyweight;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
+import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.FutureMono;
+import reactor.util.concurrent.Queues;
import java.util.Objects;
@@ -41,79 +43,93 @@
* back on for frames received.
*/
public final class WebsocketDuplexConnection implements DuplexConnection {
-
- private final Connection connection;
- private final Disposable channelClosed;
-
- /**
- * Creates a new instance
- *
- * @param connection the {@link Connection} to for managing the server
- */
- public WebsocketDuplexConnection(Connection connection) {
- this.connection = Objects.requireNonNull(connection, "connection must not be null");
- this.channelClosed =
- FutureMono.from(connection.channel().closeFuture())
- .doFinally(
- s -> {
- if (!isDisposed()) {
- dispose();
- }
- })
- .subscribe();
- }
-
- @Override
- public void dispose() {
- connection.dispose();
- }
-
- @Override
- public boolean isDisposed() {
- return connection.isDisposed();
- }
-
- @Override
- public Mono onClose() {
- return connection
- .onDispose()
- .doFinally(
- s -> {
- if (!channelClosed.isDisposed()) {
- channelClosed.dispose();
- }
- });
- }
-
- @Override
- public Flux receive() {
- return connection
- .inbound()
- .receive()
- .map(
- buf -> {
- CompositeByteBuf composite = connection.channel().alloc().compositeBuffer();
- ByteBuf length = wrappedBuffer(new byte[FRAME_LENGTH_SIZE]);
- FrameHeaderFlyweight.encodeLength(length, 0, buf.readableBytes());
- composite.addComponents(true, length, buf.retain());
- return Frame.from(composite);
- });
- }
-
- @Override
- public Mono send(Publisher frames) {
- return Flux.from(frames)
- .transform(
- frameFlux ->
- new SendPublisher<>(
- frameFlux,
- connection.channel(),
- this::toBinaryWebSocketFrame,
- binaryWebSocketFrame -> binaryWebSocketFrame.content().readableBytes()))
- .then();
- }
-
- private BinaryWebSocketFrame toBinaryWebSocketFrame(Frame frame) {
- return new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE).retain());
- }
+
+ private final Connection connection;
+ private final Disposable channelClosed;
+
+ /**
+ * Creates a new instance
+ *
+ * @param connection the {@link Connection} to for managing the server
+ */
+ public WebsocketDuplexConnection(Connection connection) {
+ this.connection = Objects.requireNonNull(connection, "connection must not be null");
+ this.channelClosed =
+ FutureMono.from(connection.channel().closeFuture())
+ .doFinally(
+ s -> {
+ if (!isDisposed()) {
+ dispose();
+ }
+ })
+ .subscribe();
+ }
+
+ @Override
+ public void dispose() {
+ connection.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return connection.isDisposed();
+ }
+
+ @Override
+ public Mono onClose() {
+ return connection
+ .onDispose()
+ .doFinally(
+ s -> {
+ if (!channelClosed.isDisposed()) {
+ channelClosed.dispose();
+ }
+ });
+ }
+
+ @Override
+ public Flux receive() {
+ return connection
+ .inbound()
+ .receive()
+ .map(
+ buf -> {
+ CompositeByteBuf composite = connection.channel().alloc().compositeBuffer();
+ ByteBuf length = wrappedBuffer(new byte[FRAME_LENGTH_SIZE]);
+ FrameHeaderFlyweight.encodeLength(length, 0, buf.readableBytes());
+ composite.addComponents(true, length, buf.retain());
+ return Frame.from(composite);
+ });
+ }
+
+ @Override
+ public Mono send(Publisher frames) {
+ return Flux.from(frames)
+ .transform(
+ frameFlux -> {
+ if (frameFlux instanceof Fuseable.QueueSubscription) {
+ Fuseable.QueueSubscription queueSubscription =
+ (Fuseable.QueueSubscription) frameFlux;
+ queueSubscription.requestFusion(Fuseable.ASYNC);
+ return new SendPublisher<>(
+ queueSubscription,
+ frameFlux,
+ connection.channel(),
+ this::toBinaryWebSocketFrame,
+ binaryWebSocketFrame -> binaryWebSocketFrame.content().readableBytes());
+ } else {
+ return new SendPublisher<>(
+ Queues.small().get(),
+ frameFlux,
+ connection.channel(),
+ this::toBinaryWebSocketFrame,
+ binaryWebSocketFrame -> binaryWebSocketFrame.content().readableBytes());
+ }
+ })
+ .then();
+ }
+
+ private BinaryWebSocketFrame toBinaryWebSocketFrame(Frame frame) {
+ return new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE).retain());
+ }
}