Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 0 additions & 97 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.rsocket.core;

import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ReferenceCounted;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketClient;
import io.rsocket.frame.FrameType;
import java.util.AbstractMap;
import java.util.Map;
Expand Down Expand Up @@ -57,7 +71,11 @@ class DefaultRSocketClient extends ResolvingOperator<RSocket>
AtomicReferenceFieldUpdater.newUpdater(DefaultRSocketClient.class, Subscription.class, "s");

DefaultRSocketClient(Mono<RSocket> source) {
this.source = source;
this.source = unwrapReconnectMono(source);
}

private Mono<RSocket> unwrapReconnectMono(Mono<RSocket> source) {
return source instanceof ReconnectMono ? ((ReconnectMono<RSocket>) source).getSource() : source;
}

@Override
Expand Down
137 changes: 137 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.rsocket.core;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* Contract for performing RSocket requests.
*
* <p>{@link RSocketClient} differs from {@link RSocket} in a number of ways:
*
* <ul>
* <li>{@code RSocket} represents a "live" connection that is transient and needs to be obtained
* typically from a {@code Mono<RSocket>} source via {@code flatMap} or block. By contrast,
* {@code RSocketClient} is a higher level layer that contains such a {@link #source() source}
* of connections and transparently obtains and re-obtains a shared connection as needed when
* requests are made concurrently. That means an {@code RSocketClient} can simply be created
* once, even before a connection is established, and shared as a singleton across multiple
* places as you would with any other client.
* <li>For request input {@code RSocket} accepts an instance of {@code Payload} and does not allow
* more than one subscription per request because there is no way to safely re-use that input.
* By contrast {@code RSocketClient} accepts {@code Publisher<Payload>} and allow
* re-subscribing which repeats the request.
* <li>{@code RSocket} can be used for sending and it can also be implemented for receiving. By
* contrast {@code RSocketClient} is used only for sending, typically from the client side
* which allows obtaining and re-obtaining connections from a source as needed. However it can
* also be used from the server side by {@link #from(RSocket) wrapping} the "live" {@code
* RSocket} for a given connection.
* </ul>
*
* <p>The example below shows how to create an {@code RSocketClient}:
*
* <pre class="code">{@code
* Mono<RSocket> source =
* RSocketConnector.create()
* .metadataMimeType("message/x.rsocket.composite-metadata.v0")
* .dataMimeType("application/cbor")
* .connect(TcpClientTransport.create("localhost", 7000));
*
* RSocketClient client = RSocketClient.from(source);
* }</pre>
*
* <p>The below configures retry logic to use when a shared {@code RSocket} connection is obtained:
*
* <pre class="code">{@code
* Mono<RSocket> source =
* RSocketConnector.create()
* .metadataMimeType("message/x.rsocket.composite-metadata.v0")
* .dataMimeType("application/cbor")
* .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1)))
* .connect(TcpClientTransport.create("localhost", 7000));
*
* RSocketClient client = RSocketClient.from(source);
* }</pre>
*
* @since 1.1
* @see io.rsocket.loadbalance.LoadbalanceRSocketClient
*/
public interface RSocketClient extends Disposable {

/** Return the underlying source used to obtain a shared {@link RSocket} connection. */
Mono<RSocket> source();

/**
* Perform a Fire-and-Forget interaction via {@link RSocket#fireAndForget(Payload)}. Allows
* multiple subscriptions and performs a request per subscriber.
*/
Mono<Void> fireAndForget(Mono<Payload> payloadMono);

/**
* Perform a Request-Response interaction via {@link RSocket#requestResponse(Payload)}. Allows
* multiple subscriptions and performs a request per subscriber.
*/
Mono<Payload> requestResponse(Mono<Payload> payloadMono);

/**
* Perform a Request-Stream interaction via {@link RSocket#requestStream(Payload)}. Allows
* multiple subscriptions and performs a request per subscriber.
*/
Flux<Payload> requestStream(Mono<Payload> payloadMono);

/**
* Perform a Request-Channel interaction via {@link RSocket#requestChannel(Publisher)}. Allows
* multiple subscriptions and performs a request per subscriber.
*/
Flux<Payload> requestChannel(Publisher<Payload> payloads);

/**
* Perform a Metadata Push via {@link RSocket#metadataPush(Payload)}. Allows multiple
* subscriptions and performs a request per subscriber.
*/
Mono<Void> metadataPush(Mono<Payload> payloadMono);

/**
* Create an {@link RSocketClient} that obtains shared connections as needed, when requests are
* made, from the given {@code Mono<RSocket>} source.
*
* @param source the source for connections, typically prepared via {@link RSocketConnector}.
* @return the created client instance
*/
static RSocketClient from(Mono<RSocket> source) {
return new DefaultRSocketClient(source);
}

/**
* Adapt the given {@link RSocket} to use as {@link RSocketClient}. This is useful to wrap the
* sending {@code RSocket} in a server.
*
* <p><strong>Note:</strong> unlike an {@code RSocketClient} created via {@link
* RSocketClient#from(Mono)}, the instance returned from this factory method can only perform
* requests for as long as the given {@code RSocket} remains "live".
*
* @param rsocket the {@code RSocket} to perform requests with
* @return the created client instance
*/
static RSocketClient from(RSocket rsocket) {
return new RSocketClientAdapter(rsocket);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketClient;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -30,7 +29,7 @@
*
* @since 1.1
*/
public class RSocketClientAdapter implements RSocketClient {
class RSocketClientAdapter implements RSocketClient {

private final RSocket rsocket;

Expand Down
63 changes: 9 additions & 54 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketClient;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
Expand Down Expand Up @@ -120,15 +119,6 @@ public static Mono<RSocket> connectWith(ClientTransport transport) {
return RSocketConnector.create().connect(() -> transport);
}

/**
* @param transport
* @return
* @since 1.0.1
*/
public static RSocketClient createRSocketClient(ClientTransport transport) {
return RSocketConnector.create().toRSocketClient(transport);
}

/**
* Provide a {@code Mono} from which to obtain the {@code Payload} for the initial SETUP frame.
* Data and metadata should be formatted according to the MIME types specified via {@link
Expand Down Expand Up @@ -485,37 +475,6 @@ public RSocketConnector payloadDecoder(PayloadDecoder decoder) {
return this;
}

/**
* Create {@link RSocketClient} that will use {@link #connect(ClientTransport)} as its source to
* obtain a live, shared {@code RSocket} when the first request is made, and also on subsequent
* requests after the connection is lost.
*
* <p>The following transports are available through additional RSocket Java modules:
*
* <ul>
* <li>{@link io.rsocket.transport.netty.client.TcpClientTransport TcpClientTransport} via
* {@code rsocket-transport-netty}.
* <li>{@link io.rsocket.transport.netty.client.WebsocketClientTransport
* WebsocketClientTransport} via {@code rsocket-transport-netty}.
* <li>{@link io.rsocket.transport.local.LocalClientTransport LocalClientTransport} via {@code
* rsocket-transport-local}
* </ul>
*
* @param transport the transport of choice to connect with
* @return a {@code RSocketClient} with not established connection. Note, connection will be
* established on the first request
* @since 1.0.1
*/
public RSocketClient toRSocketClient(ClientTransport transport) {
Mono<RSocket> source = connect0(() -> transport);

if (retrySpec != null) {
source = source.retryWhen(retrySpec);
}

return new DefaultRSocketClient(source);
}

/**
* Connect with the given transport and obtain a live {@link RSocket} to use for making requests.
* Each subscriber to the returned {@code Mono} receives a new connection, if neither {@link
Expand Down Expand Up @@ -549,19 +508,6 @@ public Mono<RSocket> connect(ClientTransport transport) {
* @return a {@code Mono} with the connected RSocket
*/
public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
return this.connect0(transportSupplier)
.as(
source -> {
if (retrySpec != null) {
return new ReconnectMono<>(
source.retryWhen(retrySpec), Disposable::dispose, INVALIDATE_FUNCTION);
} else {
return source;
}
});
}

private Mono<RSocket> connect0(Supplier<ClientTransport> transportSupplier) {
return Mono.fromSupplier(transportSupplier)
.flatMap(
ct -> {
Expand Down Expand Up @@ -692,6 +638,15 @@ private Mono<RSocket> connect0(Supplier<ClientTransport> transportSupplier) {
})
.doFinally(signalType -> setup.release());
});
})
.as(
source -> {
if (retrySpec != null) {
return new ReconnectMono<>(
source.retryWhen(retrySpec), Disposable::dispose, INVALIDATE_FUNCTION);
} else {
return source;
}
});
}
}
Loading