Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/RSocketErrorException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket;

import reactor.util.annotation.Nullable;

/**
* Exception that represents an RSocket protocol error.
*
* @see <a href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#error-frame-0x0b">ERROR
* Frame (0x0B)</a>
*/
public class RSocketErrorException extends RuntimeException {

private static final long serialVersionUID = -1628781753426267554L;

private static final int MIN_ERROR_CODE = 0x00000001;

private static final int MAX_ERROR_CODE = 0xFFFFFFFE;

private final int errorCode;

/**
* Constructor with a protocol error code and a message.
*
* @param errorCode the RSocket protocol error code
* @param message error explanation
*/
public RSocketErrorException(int errorCode, String message) {
this(errorCode, message, null);
}

/**
* Alternative to {@link #RSocketErrorException(int, String)} with a root cause.
*
* @param errorCode the RSocket protocol error code
* @param message error explanation
* @param cause a root cause for the error
*/
public RSocketErrorException(int errorCode, String message, @Nullable Throwable cause) {
super(message, cause);
this.errorCode = errorCode;
if (errorCode > MAX_ERROR_CODE && errorCode < MIN_ERROR_CODE) {
throw new IllegalArgumentException(
"Allowed errorCode value should be in range [0x00000001-0xFFFFFFFE]", this);
}
}

/**
* Return the RSocket <a
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#error-codes">error code</a>
* represented by this exception
*
* @return the RSocket protocol error code
*/
public int errorCode() {
return errorCode;
}

@Override
public String toString() {
return getClass().getSimpleName()
+ " (0x"
+ Integer.toHexString(errorCode)
+ "): "
+ getMessage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.DuplexConnection;
Expand All @@ -28,7 +29,6 @@
import io.rsocket.frame.SetupFrameFlyweight;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.ClientServerInputMultiplexer;
import io.rsocket.internal.ClientSetup;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.lease.LeaseStats;
import io.rsocket.lease.Leases;
Expand All @@ -39,6 +39,7 @@
import io.rsocket.plugins.Plugins;
import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.plugins.SocketAcceptorInterceptor;
import io.rsocket.resume.ClientRSocketSession;
import io.rsocket.resume.ExponentialBackoffResumeStrategy;
import io.rsocket.resume.InMemoryResumableFramesStore;
import io.rsocket.resume.ResumableFramesStore;
Expand Down Expand Up @@ -309,10 +310,31 @@ public Mono<RSocket> start() {
return newConnection()
.flatMap(
connection -> {
ClientSetup clientSetup = clientSetup(connection);
ByteBuf resumeToken = clientSetup.resumeToken();
KeepAliveHandler keepAliveHandler = clientSetup.keepAliveHandler();
DuplexConnection wrappedConnection = clientSetup.connection();
ByteBuf resumeToken;
KeepAliveHandler keepAliveHandler;
DuplexConnection wrappedConnection;

if (resumeEnabled) {
resumeToken = resumeTokenSupplier.get();
ClientRSocketSession session =
new ClientRSocketSession(
connection,
allocator,
resumeSessionDuration,
resumeStrategySupplier,
resumeStoreFactory.apply(resumeToken),
resumeStreamTimeout,
resumeCleanupStoreOnKeepAlive)
.continueWith(newConnection())
.resumeToken(resumeToken);
keepAliveHandler =
new KeepAliveHandler.ResumableKeepAliveHandler(session.resumableConnection());
wrappedConnection = session.resumableConnection();
} else {
resumeToken = Unpooled.EMPTY_BUFFER;
keepAliveHandler = new KeepAliveHandler.DefaultKeepAliveHandler(connection);
wrappedConnection = connection;
}

ClientServerInputMultiplexer multiplexer =
new ClientServerInputMultiplexer(wrappedConnection, plugins, true);
Expand Down Expand Up @@ -405,24 +427,6 @@ private int keepAliveTimeout() {
return (int) (ackTimeout.toMillis() + tickPeriod.toMillis() * missedAcks);
}

private ClientSetup clientSetup(DuplexConnection startConnection) {
if (resumeEnabled) {
ByteBuf resumeToken = resumeTokenSupplier.get();
return new ClientSetup.ResumableClientSetup(
allocator,
startConnection,
newConnection(),
resumeToken,
resumeStoreFactory.apply(resumeToken),
resumeSessionDuration,
resumeStreamTimeout,
resumeStrategySupplier,
resumeCleanupStoreOnKeepAlive);
} else {
return new ClientSetup.DefaultClientSetup(startConnection);
}
}

private Mono<DuplexConnection> newConnection() {
return Mono.fromSupplier(transportClient).flatMap(t -> t.connect(mtu));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.rsocket.frame.SetupFrameFlyweight;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.ClientServerInputMultiplexer;
import io.rsocket.internal.ServerSetup;
import io.rsocket.lease.Leases;
import io.rsocket.lease.RequesterLeaseHandler;
import io.rsocket.lease.ResponderLeaseHandler;
Expand All @@ -42,7 +41,6 @@
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.resume.SessionManager;
import io.rsocket.transport.ServerTransport;
import io.rsocket.util.ConnectionUtils;
import io.rsocket.util.MultiSubscriberRSocket;
import java.time.Duration;
import java.util.Objects;
Expand Down Expand Up @@ -232,15 +230,25 @@ private Mono<Void> accept(
case RESUME:
return acceptResume(serverSetup, startFrame, multiplexer);
default:
return acceptUnknown(startFrame, multiplexer);
return serverSetup
.sendError(
multiplexer,
new InvalidSetupException(
"invalid setup frame: " + FrameHeaderFlyweight.frameType(startFrame)))
.doFinally(
signalType -> {
startFrame.release();
multiplexer.dispose();
});
}
}

private Mono<Void> acceptSetup(
ServerSetup serverSetup, ByteBuf setupFrame, ClientServerInputMultiplexer multiplexer) {

if (!SetupFrameFlyweight.isSupportedVersion(setupFrame)) {
return sendError(
return serverSetup
.sendError(
multiplexer,
new InvalidSetupException(
"Unsupported version: " + SetupFrameFlyweight.humanReadableVersion(setupFrame)))
Expand All @@ -254,7 +262,8 @@ private Mono<Void> acceptSetup(
boolean isLeaseEnabled = leaseEnabled;

if (SetupFrameFlyweight.honorLease(setupFrame) && !isLeaseEnabled) {
return sendError(multiplexer, new InvalidSetupException("lease is not supported"))
return serverSetup
.sendError(multiplexer, new InvalidSetupException("lease is not supported"))
.doFinally(
signalType -> {
setupFrame.release();
Expand Down Expand Up @@ -295,7 +304,10 @@ private Mono<Void> acceptSetup(
.applySocketAcceptorInterceptor(acceptor)
.accept(setupPayload, wrappedRSocketRequester)
.onErrorResume(
err -> sendError(multiplexer, rejectedSetupError(err)).then(Mono.error(err)))
err ->
serverSetup
.sendError(multiplexer, rejectedSetupError(err))
.then(Mono.error(err)))
.doOnNext(
rSocketHandler -> {
RSocket wrappedRSocketHandler = plugins.applyResponder(rSocketHandler);
Expand Down Expand Up @@ -355,22 +367,6 @@ private ServerSetup serverSetup() {
: new ServerSetup.DefaultServerSetup(allocator);
}

private Mono<Void> acceptUnknown(ByteBuf frame, ClientServerInputMultiplexer multiplexer) {
return sendError(
multiplexer,
new InvalidSetupException(
"invalid setup frame: " + FrameHeaderFlyweight.frameType(frame)))
.doFinally(
signalType -> {
frame.release();
multiplexer.dispose();
});
}

private Mono<Void> sendError(ClientServerInputMultiplexer multiplexer, Exception exception) {
return ConnectionUtils.sendError(allocator, multiplexer, exception);
}

private Exception rejectedSetupError(Throwable err) {
String msg = err.getMessage();
return new RejectedSetupException(msg == null ? "rejected by server acceptor" : msg);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,40 +14,53 @@
* limitations under the License.
*/

package io.rsocket.internal;
package io.rsocket.core;

import static io.rsocket.keepalive.KeepAliveHandler.*;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.exceptions.UnsupportedSetupException;
import io.rsocket.frame.ErrorFrameFlyweight;
import io.rsocket.frame.ResumeFrameFlyweight;
import io.rsocket.frame.SetupFrameFlyweight;
import io.rsocket.internal.ClientServerInputMultiplexer;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.resume.*;
import io.rsocket.util.ConnectionUtils;
import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Function;
import reactor.core.publisher.Mono;

public interface ServerSetup {
abstract class ServerSetup {

Mono<Void> acceptRSocketSetup(
final ByteBufAllocator allocator;

public ServerSetup(ByteBufAllocator allocator) {
this.allocator = allocator;
}

abstract Mono<Void> acceptRSocketSetup(
ByteBuf frame,
ClientServerInputMultiplexer multiplexer,
BiFunction<KeepAliveHandler, ClientServerInputMultiplexer, Mono<Void>> then);

Mono<Void> acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexer multiplexer);
abstract Mono<Void> acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexer multiplexer);

default void dispose() {}
void dispose() {}

class DefaultServerSetup implements ServerSetup {
private final ByteBufAllocator allocator;
Mono<Void> sendError(ClientServerInputMultiplexer multiplexer, Exception exception) {
return multiplexer
.asSetupConnection()
.sendOne(ErrorFrameFlyweight.encode(allocator, 0, exception))
.onErrorResume(err -> Mono.empty());
}

public DefaultServerSetup(ByteBufAllocator allocator) {
this.allocator = allocator;
static class DefaultServerSetup extends ServerSetup {

DefaultServerSetup(ByteBufAllocator allocator) {
super(allocator);
}

@Override
Expand Down Expand Up @@ -78,27 +91,24 @@ public Mono<Void> acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexe
multiplexer.dispose();
});
}

private Mono<Void> sendError(ClientServerInputMultiplexer multiplexer, Exception exception) {
return ConnectionUtils.sendError(allocator, multiplexer, exception);
}
}

class ResumableServerSetup implements ServerSetup {
static class ResumableServerSetup extends ServerSetup {
private final ByteBufAllocator allocator;
private final SessionManager sessionManager;
private final Duration resumeSessionDuration;
private final Duration resumeStreamTimeout;
private final Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory;
private final boolean cleanupStoreOnKeepAlive;

public ResumableServerSetup(
ResumableServerSetup(
ByteBufAllocator allocator,
SessionManager sessionManager,
Duration resumeSessionDuration,
Duration resumeStreamTimeout,
Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory,
boolean cleanupStoreOnKeepAlive) {
super(allocator);
this.allocator = allocator;
this.sessionManager = sessionManager;
this.resumeSessionDuration = resumeSessionDuration;
Expand Down Expand Up @@ -155,10 +165,6 @@ public Mono<Void> acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexe
}
}

private Mono<Void> sendError(ClientServerInputMultiplexer multiplexer, Exception exception) {
return ConnectionUtils.sendError(allocator, multiplexer, exception);
}

@Override
public void dispose() {
sessionManager.dispose();
Expand Down
Loading