diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java deleted file mode 100644 index db2304e12..000000000 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.rsocket; - -import org.reactivestreams.Publisher; -import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; - -/** - * Contract to perform RSocket requests from client to server, transparently connecting and ensuring - * a single, shared connection to make requests with. - * - *

{@code RSocketClient} contains a {@code Mono} {@link #source() source}. It uses it to - * obtain a live, shared {@link RSocket} connection on the first request and on subsequent requests - * if the connection is lost. This eliminates the need to obtain a connection first, and makes it - * easy to pass a single {@code RSocketClient} to use from multiple places. - * - *

Request methods of {@code RSocketClient} allow multiple subscriptions with each subscription - * performing a new request. Therefore request methods accept {@code Mono} rather than - * {@code Payload} as on {@link RSocket}. By contrast, {@link RSocket} request methods cannot be - * subscribed to more than once. - * - *

Use {@link io.rsocket.core.RSocketConnector RSocketConnector} to create a client: - * - *

{@code
- * RSocketClient client =
- *         RSocketConnector.create()
- *                 .metadataMimeType("message/x.rsocket.composite-metadata.v0")
- *                 .dataMimeType("application/cbor")
- *                 .toRSocketClient(TcpClientTransport.create("localhost", 7000));
- * }
- * - *

Use the {@link io.rsocket.core.RSocketConnector#reconnect(Retry) RSocketConnector#reconnect} - * method to configure the retry logic to use whenever a shared {@code RSocket} connection needs to - * be obtained: - * - *

{@code
- * RSocketClient client =
- *         RSocketConnector.create()
- *                 .metadataMimeType("message/x.rsocket.composite-metadata.v0")
- *                 .dataMimeType("application/cbor")
- *                 .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1)))
- *                 .toRSocketClient(TcpClientTransport.create("localhost", 7000));
- * }
- * - * @since 1.0.1 - */ -public interface RSocketClient extends Disposable { - - /** Return the underlying source used to obtain a shared {@link RSocket} connection. */ - Mono source(); - - /** - * Perform a Fire-and-Forget interaction via {@link RSocket#fireAndForget(Payload)}. Allows - * multiple subscriptions and performs a request per subscriber. - */ - Mono fireAndForget(Mono payloadMono); - - /** - * Perform a Request-Response interaction via {@link RSocket#requestResponse(Payload)}. Allows - * multiple subscriptions and performs a request per subscriber. - */ - Mono requestResponse(Mono payloadMono); - - /** - * Perform a Request-Stream interaction via {@link RSocket#requestStream(Payload)}. Allows - * multiple subscriptions and performs a request per subscriber. - */ - Flux requestStream(Mono payloadMono); - - /** - * Perform a Request-Channel interaction via {@link RSocket#requestChannel(Publisher)}. Allows - * multiple subscriptions and performs a request per subscriber. - */ - Flux requestChannel(Publisher payloads); - - /** - * Perform a Metadata Push via {@link RSocket#metadataPush(Payload)}. Allows multiple - * subscriptions and performs a request per subscriber. - */ - Mono metadataPush(Mono payloadMono); -} diff --git a/rsocket-core/src/main/java/io/rsocket/core/DefaultRSocketClient.java b/rsocket-core/src/main/java/io/rsocket/core/DefaultRSocketClient.java index 24fa8f84c..4dc250158 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/DefaultRSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/core/DefaultRSocketClient.java @@ -1,10 +1,24 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.rsocket.core; import io.netty.util.IllegalReferenceCountException; import io.netty.util.ReferenceCounted; import io.rsocket.Payload; import io.rsocket.RSocket; -import io.rsocket.RSocketClient; import io.rsocket.frame.FrameType; import java.util.AbstractMap; import java.util.Map; @@ -57,7 +71,11 @@ class DefaultRSocketClient extends ResolvingOperator AtomicReferenceFieldUpdater.newUpdater(DefaultRSocketClient.class, Subscription.class, "s"); DefaultRSocketClient(Mono source) { - this.source = source; + this.source = unwrapReconnectMono(source); + } + + private Mono unwrapReconnectMono(Mono source) { + return source instanceof ReconnectMono ? ((ReconnectMono) source).getSource() : source; } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketClient.java new file mode 100644 index 000000000..81392e661 --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketClient.java @@ -0,0 +1,137 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.core; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import org.reactivestreams.Publisher; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Contract for performing RSocket requests. + * + *

{@link RSocketClient} differs from {@link RSocket} in a number of ways: + * + *

    + *
  • {@code RSocket} represents a "live" connection that is transient and needs to be obtained + * typically from a {@code Mono} source via {@code flatMap} or block. By contrast, + * {@code RSocketClient} is a higher level layer that contains such a {@link #source() source} + * of connections and transparently obtains and re-obtains a shared connection as needed when + * requests are made concurrently. That means an {@code RSocketClient} can simply be created + * once, even before a connection is established, and shared as a singleton across multiple + * places as you would with any other client. + *
  • For request input {@code RSocket} accepts an instance of {@code Payload} and does not allow + * more than one subscription per request because there is no way to safely re-use that input. + * By contrast {@code RSocketClient} accepts {@code Publisher} and allow + * re-subscribing which repeats the request. + *
  • {@code RSocket} can be used for sending and it can also be implemented for receiving. By + * contrast {@code RSocketClient} is used only for sending, typically from the client side + * which allows obtaining and re-obtaining connections from a source as needed. However it can + * also be used from the server side by {@link #from(RSocket) wrapping} the "live" {@code + * RSocket} for a given connection. + *
+ * + *

The example below shows how to create an {@code RSocketClient}: + * + *

{@code
+ * Mono source =
+ *         RSocketConnector.create()
+ *                 .metadataMimeType("message/x.rsocket.composite-metadata.v0")
+ *                 .dataMimeType("application/cbor")
+ *                 .connect(TcpClientTransport.create("localhost", 7000));
+ *
+ * RSocketClient client = RSocketClient.from(source);
+ * }
+ * + *

The below configures retry logic to use when a shared {@code RSocket} connection is obtained: + * + *

{@code
+ * Mono source =
+ *         RSocketConnector.create()
+ *                 .metadataMimeType("message/x.rsocket.composite-metadata.v0")
+ *                 .dataMimeType("application/cbor")
+ *                 .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1)))
+ *                 .connect(TcpClientTransport.create("localhost", 7000));
+ *
+ * RSocketClient client = RSocketClient.from(source);
+ * }
+ * + * @since 1.1 + * @see io.rsocket.loadbalance.LoadbalanceRSocketClient + */ +public interface RSocketClient extends Disposable { + + /** Return the underlying source used to obtain a shared {@link RSocket} connection. */ + Mono source(); + + /** + * Perform a Fire-and-Forget interaction via {@link RSocket#fireAndForget(Payload)}. Allows + * multiple subscriptions and performs a request per subscriber. + */ + Mono fireAndForget(Mono payloadMono); + + /** + * Perform a Request-Response interaction via {@link RSocket#requestResponse(Payload)}. Allows + * multiple subscriptions and performs a request per subscriber. + */ + Mono requestResponse(Mono payloadMono); + + /** + * Perform a Request-Stream interaction via {@link RSocket#requestStream(Payload)}. Allows + * multiple subscriptions and performs a request per subscriber. + */ + Flux requestStream(Mono payloadMono); + + /** + * Perform a Request-Channel interaction via {@link RSocket#requestChannel(Publisher)}. Allows + * multiple subscriptions and performs a request per subscriber. + */ + Flux requestChannel(Publisher payloads); + + /** + * Perform a Metadata Push via {@link RSocket#metadataPush(Payload)}. Allows multiple + * subscriptions and performs a request per subscriber. + */ + Mono metadataPush(Mono payloadMono); + + /** + * Create an {@link RSocketClient} that obtains shared connections as needed, when requests are + * made, from the given {@code Mono} source. + * + * @param source the source for connections, typically prepared via {@link RSocketConnector}. + * @return the created client instance + */ + static RSocketClient from(Mono source) { + return new DefaultRSocketClient(source); + } + + /** + * Adapt the given {@link RSocket} to use as {@link RSocketClient}. This is useful to wrap the + * sending {@code RSocket} in a server. + * + *

Note: unlike an {@code RSocketClient} created via {@link + * RSocketClient#from(Mono)}, the instance returned from this factory method can only perform + * requests for as long as the given {@code RSocket} remains "live". + * + * @param rsocket the {@code RSocket} to perform requests with + * @return the created client instance + */ + static RSocketClient from(RSocket rsocket) { + return new RSocketClientAdapter(rsocket); + } +} diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketClientAdapter.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketClientAdapter.java index e54bf157d..cc94f4102 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketClientAdapter.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketClientAdapter.java @@ -17,7 +17,6 @@ import io.rsocket.Payload; import io.rsocket.RSocket; -import io.rsocket.RSocketClient; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -30,7 +29,7 @@ * * @since 1.1 */ -public class RSocketClientAdapter implements RSocketClient { +class RSocketClientAdapter implements RSocketClient { private final RSocket rsocket; diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java index 692aaca7d..0058106bc 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java @@ -25,7 +25,6 @@ import io.rsocket.DuplexConnection; import io.rsocket.Payload; import io.rsocket.RSocket; -import io.rsocket.RSocketClient; import io.rsocket.SocketAcceptor; import io.rsocket.frame.SetupFrameCodec; import io.rsocket.frame.decoder.PayloadDecoder; @@ -120,15 +119,6 @@ public static Mono connectWith(ClientTransport transport) { return RSocketConnector.create().connect(() -> transport); } - /** - * @param transport - * @return - * @since 1.0.1 - */ - public static RSocketClient createRSocketClient(ClientTransport transport) { - return RSocketConnector.create().toRSocketClient(transport); - } - /** * Provide a {@code Mono} from which to obtain the {@code Payload} for the initial SETUP frame. * Data and metadata should be formatted according to the MIME types specified via {@link @@ -485,37 +475,6 @@ public RSocketConnector payloadDecoder(PayloadDecoder decoder) { return this; } - /** - * Create {@link RSocketClient} that will use {@link #connect(ClientTransport)} as its source to - * obtain a live, shared {@code RSocket} when the first request is made, and also on subsequent - * requests after the connection is lost. - * - *

The following transports are available through additional RSocket Java modules: - * - *

    - *
  • {@link io.rsocket.transport.netty.client.TcpClientTransport TcpClientTransport} via - * {@code rsocket-transport-netty}. - *
  • {@link io.rsocket.transport.netty.client.WebsocketClientTransport - * WebsocketClientTransport} via {@code rsocket-transport-netty}. - *
  • {@link io.rsocket.transport.local.LocalClientTransport LocalClientTransport} via {@code - * rsocket-transport-local} - *
- * - * @param transport the transport of choice to connect with - * @return a {@code RSocketClient} with not established connection. Note, connection will be - * established on the first request - * @since 1.0.1 - */ - public RSocketClient toRSocketClient(ClientTransport transport) { - Mono source = connect0(() -> transport); - - if (retrySpec != null) { - source = source.retryWhen(retrySpec); - } - - return new DefaultRSocketClient(source); - } - /** * Connect with the given transport and obtain a live {@link RSocket} to use for making requests. * Each subscriber to the returned {@code Mono} receives a new connection, if neither {@link @@ -549,19 +508,6 @@ public Mono connect(ClientTransport transport) { * @return a {@code Mono} with the connected RSocket */ public Mono connect(Supplier transportSupplier) { - return this.connect0(transportSupplier) - .as( - source -> { - if (retrySpec != null) { - return new ReconnectMono<>( - source.retryWhen(retrySpec), Disposable::dispose, INVALIDATE_FUNCTION); - } else { - return source; - } - }); - } - - private Mono connect0(Supplier transportSupplier) { return Mono.fromSupplier(transportSupplier) .flatMap( ct -> { @@ -692,6 +638,15 @@ private Mono connect0(Supplier transportSupplier) { }) .doFinally(signalType -> setup.release()); }); + }) + .as( + source -> { + if (retrySpec != null) { + return new ReconnectMono<>( + source.retryWhen(retrySpec), Disposable::dispose, INVALIDATE_FUNCTION); + } else { + return source; + } }); } } diff --git a/rsocket-core/src/main/java/io/rsocket/core/ReconnectMono.java b/rsocket-core/src/main/java/io/rsocket/core/ReconnectMono.java index 44e4ffa81..afad6e0df 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ReconnectMono.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ReconnectMono.java @@ -48,6 +48,10 @@ final class ReconnectMono extends Mono implements Invalidatable, Disposabl this.resolvingInner = new ResolvingInner<>(this); } + public Mono getSource() { + return source; + } + @Override public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) return source; diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java index 18600e633..2dd8ecc72 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java @@ -17,7 +17,7 @@ import io.rsocket.Payload; import io.rsocket.RSocket; -import io.rsocket.RSocketClient; +import io.rsocket.core.RSocketClient; import io.rsocket.core.RSocketConnector; import java.util.List; import org.reactivestreams.Publisher; diff --git a/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java b/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java index d41ed16c1..d080b166d 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java +++ b/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java @@ -26,7 +26,6 @@ import io.netty.util.ReferenceCounted; import io.rsocket.Payload; import io.rsocket.RSocket; -import io.rsocket.RSocketClient; import io.rsocket.frame.ErrorFrameCodec; import io.rsocket.frame.FrameHeaderCodec; import io.rsocket.frame.FrameType; diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/client/RSocketClientExample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/client/RSocketClientExample.java index e1bf459b9..2d19b9ce4 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/client/RSocketClientExample.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/client/RSocketClientExample.java @@ -1,8 +1,9 @@ package io.rsocket.examples.transport.tcp.client; import io.rsocket.Payload; -import io.rsocket.RSocketClient; +import io.rsocket.RSocket; import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketClient; import io.rsocket.core.RSocketConnector; import io.rsocket.core.RSocketServer; import io.rsocket.transport.netty.client.TcpClientTransport; @@ -35,12 +36,12 @@ public static void main(String[] args) { .doOnNext(cc -> logger.info("Server started on the address : {}", cc.address())) .subscribe(); - RSocketClient rSocketClient = + Mono source = RSocketConnector.create() .reconnect(Retry.backoff(50, Duration.ofMillis(500))) - .toRSocketClient(TcpClientTransport.create("localhost", 7000)); + .connect(TcpClientTransport.create("localhost", 7000)); - rSocketClient + RSocketClient.from(source) .requestResponse(Mono.just(DefaultPayload.create("Test Request"))) .doOnSubscribe(s -> logger.info("Executing Request")) .doOnNext( diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/loadbalancer/RoundRobinRSocketLoadbalancerExample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/loadbalancer/RoundRobinRSocketLoadbalancerExample.java index 2c1d8c37c..27d10b472 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/loadbalancer/RoundRobinRSocketLoadbalancerExample.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/loadbalancer/RoundRobinRSocketLoadbalancerExample.java @@ -15,8 +15,8 @@ */ package io.rsocket.examples.transport.tcp.loadbalancer; -import io.rsocket.RSocketClient; import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketClient; import io.rsocket.core.RSocketServer; import io.rsocket.loadbalance.LoadbalanceRSocketClient; import io.rsocket.loadbalance.LoadbalanceTarget;