From 261ce415c58c984a1cdbc7a962d471625265a570 Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Tue, 3 Jan 2017 22:45:32 -0800 Subject: [PATCH] Duplex interactions #### Problem Duplex interactions (server sending requests to the client) are broken. #### Modification - `ReactiveSocketServer` wasn't using `ClientServerInputMultiplexer` to split input into client and server sockets. This made the responses sent from the client to not reach `ClientReactiveSocket`. Modified `ReactiveSocketServer` to use `ClientServerInputMultiplexer`. - `ClientReactiveSocket` didn't handle a request-stream/channel request before transport subscription arrived. Now using a buffering subscription to buffer request-n and cancel till transport subscription arrives. - Added `DuplexClient` example to demonstrate duplex interactions. #### Result Duplex interactions works! --- .../reactivesocket/ClientReactiveSocket.java | 50 ++++++++++++-- .../reactivesocket/ServerReactiveSocket.java | 2 - .../server/ReactiveSocketServer.java | 19 +++-- .../test/util/LocalDuplexConnection.java | 10 ++- .../transport/tcp/duplex/DuplexClient.java | 69 +++++++++++++++++++ 5 files changed, 135 insertions(+), 15 deletions(-) create mode 100644 reactivesocket-examples/src/main/java/io/reactivesocket/examples/transport/tcp/duplex/DuplexClient.java diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java b/reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java index 111550ac4..a71725eac 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java @@ -26,6 +26,7 @@ import io.reactivesocket.lease.LeaseImpl; import io.reactivesocket.reactivestreams.extensions.DefaultSubscriber; import io.reactivesocket.reactivestreams.extensions.Px; +import io.reactivesocket.reactivestreams.extensions.internal.FlowControlHelper; import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription; import io.reactivesocket.reactivestreams.extensions.internal.subscribers.CancellableSubscriber; import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers; @@ -54,7 +55,7 @@ public class ClientReactiveSocket implements ReactiveSocket { private final Int2ObjectHashMap senders; private final Int2ObjectHashMap> receivers; - private volatile Subscription transportReceiveSubscription; + private final BufferingSubscription transportReceiveSubscription = new BufferingSubscription(); private CancellableSubscriber keepAliveSendSub; private volatile Consumer leaseConsumer; // Provided on start() @@ -190,7 +191,7 @@ private void startKeepAlive() { private void startReceivingRequests() { Px .from(connection.receive()) - .doOnSubscribe(subscription -> transportReceiveSubscription = subscription) + .doOnSubscribe(subscription -> transportReceiveSubscription.switchTo(subscription)) .doOnNext(this::handleIncomingFrames) .subscribe(); } @@ -200,9 +201,7 @@ protected void cleanup() { if (null != keepAliveSendSub) { keepAliveSendSub.cancel(); } - if (null != transportReceiveSubscription) { - transportReceiveSubscription.cancel(); - } + transportReceiveSubscription.cancel(); } private void handleIncomingFrames(Frame frame) { @@ -352,4 +351,45 @@ private synchronized void registerSenderReceiver(int streamId, Subscription send senders.put(streamId, sender); receivers.put(streamId, receiver); } + + private static class BufferingSubscription implements Subscription { + + private int requested; + private boolean cancelled; + private Subscription delegate; + + @Override + public void request(long n) { + if (relay()) { + delegate.request(n); + } else { + requested = FlowControlHelper.incrementRequestN(requested, n); + } + } + + @Override + public void cancel() { + if (relay()) { + delegate.cancel(); + } else { + cancelled = true; + } + } + + private void switchTo(Subscription subscription) { + synchronized (this) { + delegate = subscription; + } + if (requested > 0) { + subscription.request(requested); + } + if (cancelled) { + subscription.cancel(); + } + } + + private synchronized boolean relay() { + return delegate != null; + } + } } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/ServerReactiveSocket.java b/reactivesocket-core/src/main/java/io/reactivesocket/ServerReactiveSocket.java index 0bc6261fb..f40c7cfed 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/ServerReactiveSocket.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/ServerReactiveSocket.java @@ -20,10 +20,8 @@ import io.reactivesocket.Frame.Request; import io.reactivesocket.Frame.Response; import io.reactivesocket.exceptions.ApplicationException; -import io.reactivesocket.exceptions.RejectedSetupException; import io.reactivesocket.exceptions.SetupException; import io.reactivesocket.frame.FrameHeaderFlyweight; -import io.reactivesocket.frame.SetupFrameFlyweight; import io.reactivesocket.internal.KnownErrorFilter; import io.reactivesocket.internal.RemoteReceiver; import io.reactivesocket.internal.RemoteSender; diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/server/ReactiveSocketServer.java b/reactivesocket-core/src/main/java/io/reactivesocket/server/ReactiveSocketServer.java index 61d66afc1..09560990f 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/server/ReactiveSocketServer.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/server/ReactiveSocketServer.java @@ -24,7 +24,10 @@ import io.reactivesocket.StreamIdSupplier; import io.reactivesocket.client.KeepAliveProvider; import io.reactivesocket.exceptions.SetupException; +import io.reactivesocket.internal.ClientServerInputMultiplexer; +import io.reactivesocket.lease.DefaultLeaseHonoringSocket; import io.reactivesocket.lease.LeaseEnforcingSocket; +import io.reactivesocket.lease.LeaseHonoringSocket; import io.reactivesocket.reactivestreams.extensions.Px; import io.reactivesocket.transport.TransportServer; import io.reactivesocket.transport.TransportServer.StartedServer; @@ -42,24 +45,28 @@ public interface ReactiveSocketServer { static ReactiveSocketServer create(TransportServer transportServer) { return acceptor -> { - return transportServer.start(duplexConnection -> { - return Px.from(duplexConnection.receive()) + return transportServer.start(connection -> { + return Px.from(connection.receive()) .switchTo(setupFrame -> { if (setupFrame.getType() == FrameType.SETUP) { + ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(connection); ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame); - ClientReactiveSocket sender = new ClientReactiveSocket(duplexConnection, + ClientReactiveSocket sender = new ClientReactiveSocket(multiplexer.asServerConnection(), Throwable::printStackTrace, StreamIdSupplier.serverSupplier(), KeepAliveProvider.never()); + LeaseHonoringSocket lhs = new DefaultLeaseHonoringSocket(sender); + sender.start(lhs); LeaseEnforcingSocket handler = acceptor.accept(setupPayload, sender); - ServerReactiveSocket receiver = new ServerReactiveSocket(duplexConnection, handler, + ServerReactiveSocket receiver = new ServerReactiveSocket(multiplexer.asClientConnection(), + handler, setupPayload.willClientHonorLease(), Throwable::printStackTrace); receiver.start(); - return duplexConnection.onClose(); + return connection.onClose(); } else { return Px.error(new IllegalStateException("Invalid first frame on the connection: " - + duplexConnection + ", frame type received: " + + connection + ", frame type received: " + setupFrame.getType())); } }); diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/test/util/LocalDuplexConnection.java b/reactivesocket-core/src/test/java/io/reactivesocket/test/util/LocalDuplexConnection.java index 30f993d70..50fb05c1a 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/test/util/LocalDuplexConnection.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/test/util/LocalDuplexConnection.java @@ -19,6 +19,7 @@ import io.reactivesocket.DuplexConnection; import io.reactivesocket.Frame; import io.reactivesocket.reactivestreams.extensions.Px; +import io.reactivesocket.reactivestreams.extensions.internal.EmptySubject; import io.reactivex.Flowable; import io.reactivex.processors.PublishProcessor; import org.reactivestreams.Publisher; @@ -26,12 +27,14 @@ public class LocalDuplexConnection implements DuplexConnection { private final PublishProcessor send; private final PublishProcessor receive; + private final EmptySubject closeNotifier; private final String name; public LocalDuplexConnection(String name, PublishProcessor send, PublishProcessor receive) { this.name = name; this.send = send; this.receive = receive; + closeNotifier = new EmptySubject(); } @Override @@ -56,11 +59,14 @@ public double availability() { @Override public Publisher close() { - return Px.empty(); + return Px.defer(() -> { + closeNotifier.onComplete(); + return Px.empty(); + }); } @Override public Publisher onClose() { - return Px.empty(); + return closeNotifier; } } diff --git a/reactivesocket-examples/src/main/java/io/reactivesocket/examples/transport/tcp/duplex/DuplexClient.java b/reactivesocket-examples/src/main/java/io/reactivesocket/examples/transport/tcp/duplex/DuplexClient.java new file mode 100644 index 000000000..7bcdf7a48 --- /dev/null +++ b/reactivesocket-examples/src/main/java/io/reactivesocket/examples/transport/tcp/duplex/DuplexClient.java @@ -0,0 +1,69 @@ +/* + * Copyright 2017 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.examples.transport.tcp.duplex; + +import io.reactivesocket.AbstractReactiveSocket; +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.client.ReactiveSocketClient; +import io.reactivesocket.client.ReactiveSocketClient.SocketAcceptor; +import io.reactivesocket.frame.ByteBufferUtil; +import io.reactivesocket.lease.DisabledLeaseAcceptingSocket; +import io.reactivesocket.lease.LeaseEnforcingSocket; +import io.reactivesocket.server.ReactiveSocketServer; +import io.reactivesocket.transport.TransportServer.StartedServer; +import io.reactivesocket.transport.tcp.client.TcpTransportClient; +import io.reactivesocket.transport.tcp.server.TcpTransportServer; +import io.reactivesocket.util.PayloadImpl; +import io.reactivex.Flowable; +import org.reactivestreams.Publisher; + +import java.net.SocketAddress; +import java.util.concurrent.TimeUnit; + +import static io.reactivesocket.client.KeepAliveProvider.*; +import static io.reactivesocket.client.SetupProvider.*; + +public final class DuplexClient { + + public static void main(String[] args) { + StartedServer server = ReactiveSocketServer.create(TcpTransportServer.create()) + .start((setupPayload, reactiveSocket) -> { + Flowable.fromPublisher(reactiveSocket.requestStream(new PayloadImpl("Hello-Bidi"))) + .map(Payload::getData) + .map(ByteBufferUtil::toUtf8String) + .forEach(System.out::println); + return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() { }); + }); + + SocketAddress address = server.getServerAddress(); + + ReactiveSocketClient rsclient = ReactiveSocketClient.createDuplex(TcpTransportClient.create(address), + new SocketAcceptor() { + @Override + public LeaseEnforcingSocket accept(ReactiveSocket reactiveSocket) { + return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() { + @Override + public Publisher requestStream(Payload payload) { + return Flowable.interval(0, 1, TimeUnit.SECONDS).map(aLong -> new PayloadImpl("Bi-di Response => " + aLong)); + } + }); + } + }, keepAlive(never()).disableLease()); + + ReactiveSocket socket = Flowable.fromPublisher(rsclient.connect()).blockingFirst(); + + Flowable.fromPublisher(socket.onClose()).ignoreElements().blockingAwait(); + } +}