From 48acae80638912b71909167120ce310c27ef5533 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Mon, 28 Jan 2019 01:16:11 -0800 Subject: [PATCH 1/4] optimize request channel to let you peak at the first item in the requestChannel Signed-off-by: Robert Roeser --- .../main/java/io/rsocket/RSocketServer.java | 112 ++++++++++++------ .../main/java/io/rsocket/RequestHandler.java | 19 +++ 2 files changed, 94 insertions(+), 37 deletions(-) create mode 100644 rsocket-core/src/main/java/io/rsocket/RequestHandler.java diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index e29d4d1f1..b8167032f 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -27,7 +27,10 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.Disposable; -import reactor.core.publisher.*; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.UnicastProcessor; import java.util.Collections; import java.util.Map; @@ -39,15 +42,17 @@ import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M; /** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */ -class RSocketServer implements RSocket { +class RSocketServer implements RequestHandler { private final DuplexConnection connection; private final RSocket requestHandler; + private final RequestHandler optimizedRequestHandler; + private final boolean hasOptimizedRequestHandler; private final Function frameDecoder; private final Consumer errorConsumer; private final Map sendingSubscriptions; - private final Map> channelProcessors; + private final Map> channelProcessors; private final UnboundedProcessor sendProcessor; private KeepAliveHandler keepAliveHandler; @@ -69,12 +74,22 @@ class RSocketServer implements RSocket { Consumer errorConsumer, long tickPeriod, long ackTimeout) { + + if (requestHandler instanceof RequestHandler) { + this.optimizedRequestHandler = (RequestHandler) requestHandler; + this.hasOptimizedRequestHandler = true; + this.requestHandler = null; + } else { + this.hasOptimizedRequestHandler = false; + this.requestHandler = requestHandler; + this.optimizedRequestHandler = null; + } + this.connection = connection; - this.requestHandler = requestHandler; this.frameDecoder = frameDecoder; this.errorConsumer = errorConsumer; this.sendingSubscriptions = Collections.synchronizedMap(new IntObjectHashMap<>()); - this.channelProcessors = Collections.synchronizedMap(new IntObjectHashMap<>()); + this.channelProcessors = Collections.synchronizedMap(new IntObjectHashMap<>()); // DO NOT Change the order here. The Send processor must be subscribed to before receiving // connections @@ -116,21 +131,27 @@ class RSocketServer implements RSocket { } private void handleSendProcessorError(Throwable t) { - sendingSubscriptions.values().forEach(subscription -> { - try { - subscription.cancel(); - } catch (Throwable e) { - errorConsumer.accept(e); - } - }); + sendingSubscriptions + .values() + .forEach( + subscription -> { + try { + subscription.cancel(); + } catch (Throwable e) { + errorConsumer.accept(e); + } + }); - channelProcessors.values().forEach(subscription -> { - try { - subscription.onError(t); - } catch (Throwable e) { - errorConsumer.accept(e); - } - }); + channelProcessors + .values() + .forEach( + subscription -> { + try { + subscription.onError(t); + } catch (Throwable e) { + errorConsumer.accept(e); + } + }); } private void handleSendProcessorCancel(SignalType t) { @@ -138,21 +159,27 @@ private void handleSendProcessorCancel(SignalType t) { return; } - sendingSubscriptions.values().forEach(subscription -> { - try { - subscription.cancel(); - } catch (Throwable e) { - errorConsumer.accept(e); - } - }); + sendingSubscriptions + .values() + .forEach( + subscription -> { + try { + subscription.cancel(); + } catch (Throwable e) { + errorConsumer.accept(e); + } + }); - channelProcessors.values().forEach(subscription -> { - try { - subscription.onComplete(); - } catch (Throwable e) { - errorConsumer.accept(e); - } - }); + channelProcessors + .values() + .forEach( + subscription -> { + try { + subscription.onComplete(); + } catch (Throwable e) { + errorConsumer.accept(e); + } + }); } @Override @@ -191,6 +218,15 @@ public Flux requestChannel(Publisher payloads) { } } + @Override + public Flux requestChannel(Payload payload, Publisher payloads) { + try { + return optimizedRequestHandler.requestChannel(payloads); + } catch (Throwable t) { + return Flux.error(t); + } + } + @Override public Mono metadataPush(Payload payload) { try { @@ -232,9 +268,7 @@ private synchronized void cleanUpSendingSubscriptions() { } private synchronized void cleanUpChannelProcessors() { - channelProcessors - .values() - .forEach(Processor::onComplete); + channelProcessors.values().forEach(Processor::onComplete); channelProcessors.clear(); } @@ -381,7 +415,11 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) { // and any later payload can be processed frames.onNext(payload); - handleStream(streamId, requestChannel(payloads), initialRequestN); + if (hasOptimizedRequestHandler) { + handleStream(streamId, requestChannel(payload, payloads), initialRequestN); + } else { + handleStream(streamId, requestChannel(payloads), initialRequestN); + } } private void handleKeepAliveFrame(Frame frame) { diff --git a/rsocket-core/src/main/java/io/rsocket/RequestHandler.java b/rsocket-core/src/main/java/io/rsocket/RequestHandler.java new file mode 100644 index 000000000..077781eb7 --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/RequestHandler.java @@ -0,0 +1,19 @@ +package io.rsocket; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +/** + * Extends the {@link RSocket} that allows an implementor to peek at the first request payload of a channel. + */ +public interface RequestHandler extends RSocket { + /** + * Implement this method to peak at the first payload of the incoming request stream + * @param payload First payload in the stream + * @param payloads Stream of request payloads. + * @return Stream of response payloads. + */ + default Flux requestChannel(Payload payload, Publisher payloads) { + return requestChannel(payloads); + } +} From 9c5ac91b11e9c7bafc4c5f0c3b91ebbc67bb148f Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Mon, 28 Jan 2019 11:33:14 -0800 Subject: [PATCH 2/4] updated javadoc Signed-off-by: Robert Roeser --- .../src/main/java/io/rsocket/RequestHandler.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/RequestHandler.java b/rsocket-core/src/main/java/io/rsocket/RequestHandler.java index 077781eb7..03f7c2b23 100644 --- a/rsocket-core/src/main/java/io/rsocket/RequestHandler.java +++ b/rsocket-core/src/main/java/io/rsocket/RequestHandler.java @@ -4,12 +4,16 @@ import reactor.core.publisher.Flux; /** - * Extends the {@link RSocket} that allows an implementor to peek at the first request payload of a channel. + * Extends the {@link RSocket} that allows an implementer to peek at the first request payload of a + * channel. */ public interface RequestHandler extends RSocket { /** - * Implement this method to peak at the first payload of the incoming request stream - * @param payload First payload in the stream + * Implement this method to peak at the first payload of the incoming request stream without + * having to subscribe to Publish<Payload> payloads + * + * @param payload First payload in the stream - this is the same payload as the first payload in + * Publisher<Payload> payloads * @param payloads Stream of request payloads. * @return Stream of response payloads. */ From 92a4d1d01acc389cf401e5f96644c1b17eb67048 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Tue, 29 Jan 2019 15:58:25 -0800 Subject: [PATCH 3/4] updates Signed-off-by: Robert Roeser --- .../io/rsocket/ConnectionSetupPayload.java | 2 +- .../main/java/io/rsocket/RSocketServer.java | 23 +++++++------------ ...uestHandler.java => ResponderRSocket.java} | 2 +- 3 files changed, 10 insertions(+), 17 deletions(-) rename rsocket-core/src/main/java/io/rsocket/{RequestHandler.java => ResponderRSocket.java} (93%) diff --git a/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java b/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java index d88cfe445..38706fba2 100644 --- a/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java @@ -25,7 +25,7 @@ import io.rsocket.framing.FrameType; /** - * Exposed to server for determination of RequestHandler based on mime types and SETUP metadata/data + * Exposed to server for determination of ResponderRSocket based on mime types and SETUP metadata/data */ public abstract class ConnectionSetupPayload extends AbstractReferenceCounted implements Payload { diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index b8167032f..219c7199a 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -42,12 +42,11 @@ import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M; /** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */ -class RSocketServer implements RequestHandler { +class RSocketServer implements ResponderRSocket { private final DuplexConnection connection; private final RSocket requestHandler; - private final RequestHandler optimizedRequestHandler; - private final boolean hasOptimizedRequestHandler; + private final ResponderRSocket responderRSocket; private final Function frameDecoder; private final Consumer errorConsumer; @@ -74,16 +73,10 @@ class RSocketServer implements RequestHandler { Consumer errorConsumer, long tickPeriod, long ackTimeout) { - - if (requestHandler instanceof RequestHandler) { - this.optimizedRequestHandler = (RequestHandler) requestHandler; - this.hasOptimizedRequestHandler = true; - this.requestHandler = null; - } else { - this.hasOptimizedRequestHandler = false; - this.requestHandler = requestHandler; - this.optimizedRequestHandler = null; - } + + this.requestHandler = requestHandler; + this.responderRSocket = + (requestHandler instanceof ResponderRSocket) ? (ResponderRSocket) requestHandler : null; this.connection = connection; this.frameDecoder = frameDecoder; @@ -221,7 +214,7 @@ public Flux requestChannel(Publisher payloads) { @Override public Flux requestChannel(Payload payload, Publisher payloads) { try { - return optimizedRequestHandler.requestChannel(payloads); + return responderRSocket.requestChannel(payloads); } catch (Throwable t) { return Flux.error(t); } @@ -415,7 +408,7 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) { // and any later payload can be processed frames.onNext(payload); - if (hasOptimizedRequestHandler) { + if (responderRSocket != null) { handleStream(streamId, requestChannel(payload, payloads), initialRequestN); } else { handleStream(streamId, requestChannel(payloads), initialRequestN); diff --git a/rsocket-core/src/main/java/io/rsocket/RequestHandler.java b/rsocket-core/src/main/java/io/rsocket/ResponderRSocket.java similarity index 93% rename from rsocket-core/src/main/java/io/rsocket/RequestHandler.java rename to rsocket-core/src/main/java/io/rsocket/ResponderRSocket.java index 03f7c2b23..f98901472 100644 --- a/rsocket-core/src/main/java/io/rsocket/RequestHandler.java +++ b/rsocket-core/src/main/java/io/rsocket/ResponderRSocket.java @@ -7,7 +7,7 @@ * Extends the {@link RSocket} that allows an implementer to peek at the first request payload of a * channel. */ -public interface RequestHandler extends RSocket { +public interface ResponderRSocket extends RSocket { /** * Implement this method to peak at the first payload of the incoming request stream without * having to subscribe to Publish<Payload> payloads From f2a73a8923689027cbd6a94ffaeac5f3e6e6ba93 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Wed, 30 Jan 2019 17:31:17 -0800 Subject: [PATCH 4/4] call requestChannelWith 2 arguments Signed-off-by: Robert Roeser --- rsocket-core/src/main/java/io/rsocket/RSocketServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index 219c7199a..a226b5c06 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -214,7 +214,7 @@ public Flux requestChannel(Publisher payloads) { @Override public Flux requestChannel(Payload payload, Publisher payloads) { try { - return responderRSocket.requestChannel(payloads); + return responderRSocket.requestChannel(payload, payloads); } catch (Throwable t) { return Flux.error(t); }