Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,25 +252,19 @@ public Flux<Payload> handleRequestStream(final Payload payload) {
Frame.Request.from(streamId, FrameType.REQUEST_STREAM, payload, l);

sendProcessor.onNext(requestFrame);
} else if (contains(streamId)
&& connection.availability() > 0.0
&& !receiver.isTerminated()) {
} else if (contains(streamId) && !receiver.isTerminated()) {
sendProcessor.onNext(Frame.RequestN.from(streamId, l));
}
})
.doOnError(
t -> {
if (contains(streamId)
&& connection.availability() > 0.0
&& !receiver.isTerminated()) {
if (contains(streamId) && !receiver.isTerminated()) {
sendProcessor.onNext(Frame.Error.from(streamId, t));
}
})
.doOnCancel(
() -> {
if (contains(streamId)
&& connection.availability() > 0.0
&& !receiver.isTerminated()) {
if (contains(streamId) && !receiver.isTerminated()) {
sendProcessor.onNext(Frame.Cancel.from(streamId));
}
})
Expand Down Expand Up @@ -311,9 +305,7 @@ private Flux<Payload> handleStreamResponse(Flux<Payload> request, FrameType requ
boolean firstRequest = true;

boolean isValidToSendFrame() {
return contains(streamId)
&& connection.availability() > 0.0
&& !receiver.isTerminated();
return contains(streamId) && !receiver.isTerminated();
}

void sendOneFrame(Frame frame) {
Expand Down
12 changes: 3 additions & 9 deletions rsocket-core/src/main/java/io/rsocket/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -330,21 +330,15 @@ private Mono<Void> handleChannel(int streamId, Frame firstFrame) {
frames
.doOnCancel(
() -> {
if (connection.availability() > 0.0) {
sendProcessor.onNext(Frame.Cancel.from(streamId));
}
sendProcessor.onNext(Frame.Cancel.from(streamId));
})
.doOnError(
t -> {
if (connection.availability() > 0.0) {
sendProcessor.onNext(Frame.Error.from(streamId, t));
}
sendProcessor.onNext(Frame.Error.from(streamId, t));
})
.doOnRequest(
l -> {
if (connection.availability() > 0.0) {
sendProcessor.onNext(Frame.RequestN.from(streamId, l));
}
sendProcessor.onNext(Frame.RequestN.from(streamId, l));
})
.doFinally(signalType -> removeChannelProcessor(streamId));

Expand Down
7 changes: 7 additions & 0 deletions rsocket-core/src/test/java/io/rsocket/AbstractSocketRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.rsocket.test.util.TestDuplexConnection;
import io.rsocket.test.util.TestSubscriber;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.Assert;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
Expand Down Expand Up @@ -50,4 +51,10 @@ protected void init() {
}

protected abstract T newRSocket();

public void assertNoConnectionErrors() {
if (errors.size() > 1) {
Assert.fail("No connection errors expected: " + errors.peek().toString());
}
}
}
13 changes: 8 additions & 5 deletions rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -158,17 +157,21 @@ public void testRequestReplyWithCancel() {
}

@Test(timeout = 2_000)
@Ignore
public void testRequestReplyErrorOnSend() {
rule.connection.setAvailability(0); // Fails send
Mono<Payload> response = rule.socket.requestResponse(PayloadImpl.EMPTY);
Subscriber<Payload> responseSub = TestSubscriber.create();
Subscriber<Payload> responseSub = TestSubscriber.create(10);
response.subscribe(responseSub);

verify(responseSub).onError(any(RuntimeException.class));
this.rule.assertNoConnectionErrors();

verify(responseSub).onSubscribe(any(Subscription.class));

// TODO this should get the error reported through the response subscription
// verify(responseSub).onError(any(RuntimeException.class));
}

@Test
@Test(timeout = 2_000)
public void testLazyRequestResponse() {
Publisher<Payload> response = rule.socket.requestResponse(PayloadImpl.EMPTY);
int streamId = sendRequestResponse(response);
Expand Down
35 changes: 32 additions & 3 deletions rsocket-core/src/test/java/io/rsocket/RSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import org.hamcrest.MatcherAssert;
import org.junit.Ignore;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
Expand All @@ -54,7 +54,6 @@ public void testRequestReplyNoError() {
}

@Test(timeout = 2000)
@Ignore
public void testHandlerEmitsError() {
rule.setRequestAcceptor(
new AbstractRSocket() {
Expand All @@ -66,7 +65,11 @@ public Mono<Payload> requestResponse(Payload payload) {
Subscriber<Payload> subscriber = TestSubscriber.create();
rule.crs.requestResponse(PayloadImpl.EMPTY).subscribe(subscriber);
verify(subscriber).onError(any(ApplicationException.class));
rule.assertNoErrors();

// Client sees error through normal API
rule.assertNoClientErrors();

rule.assertServerError("java.lang.NullPointerException: Deliberate exception.");
}

@Test(timeout = 2000)
Expand Down Expand Up @@ -149,10 +152,36 @@ public void setRequestAcceptor(RSocket requestAcceptor) {
}

public void assertNoErrors() {
assertNoClientErrors();
assertNoServerErrors();
}

public void assertNoClientErrors() {
MatcherAssert.assertThat(
"Unexpected error on the client connection.", clientErrors, is(empty()));
}

public void assertNoServerErrors() {
MatcherAssert.assertThat(
"Unexpected error on the server connection.", serverErrors, is(empty()));
}

public void assertClientError(String s) {
assertError(s, "client", this.clientErrors);
}

public void assertServerError(String s) {
assertError(s, "server", this.serverErrors);
}
}

public static void assertError(String s, String mode, ArrayList<Throwable> errors) {
for (Throwable t : errors) {
if (t.toString().equals(s)) {
return;
}
}

Assert.fail("Expected " + mode + " connection error: " + s + " other errors " + errors.size());
}
}