diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/exceptions/TransportException.java b/reactivesocket-core/src/main/java/io/reactivesocket/exceptions/TransportException.java index e4e4029a0..5d53c40d8 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/exceptions/TransportException.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/exceptions/TransportException.java @@ -24,5 +24,4 @@ public TransportException(Throwable t) { public synchronized Throwable fillInStackTrace() { return this; } - } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Requester.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Requester.java index 21eef8bcc..9fa027056 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Requester.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Requester.java @@ -15,7 +15,6 @@ */ package io.reactivesocket.internal; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collection; @@ -837,9 +836,8 @@ public void onNext(Frame frame) { cancel(); } else if (type == FrameType.ERROR) { terminated.set(true); - final ByteBuffer byteBuffer = frame.getData(); - String errorMessage = getByteBufferAsString(byteBuffer); - onError(new RuntimeException(errorMessage)); + Throwable throwable = Exceptions.from(frame); + onError(throwable); cancel(); } else { onError(new RuntimeException("Unexpected FrameType: " + frame.getType())); diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/LeaseTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/LeaseTest.java index 1d460b507..20a254a61 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/LeaseTest.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/LeaseTest.java @@ -15,6 +15,7 @@ */ package io.reactivesocket; +import io.reactivesocket.exceptions.RejectedException; import io.reactivesocket.internal.Publishers; import io.reactivesocket.internal.Responder; import org.junit.After; @@ -182,7 +183,7 @@ public void testWriteWithoutLease() throws InterruptedException { TestSubscriber ts2 = new TestSubscriber<>(); response2.subscribe(ts2); ts2.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); - ts2.assertError(RuntimeException.class); + ts2.assertError(RejectedException.class); } @Test(timeout=2000) diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/internal/RequesterTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/internal/RequesterTest.java index 91d45a66d..ba9a28124 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/internal/RequesterTest.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/internal/RequesterTest.java @@ -21,6 +21,7 @@ import io.reactivesocket.LatchedCompletable; import io.reactivesocket.Payload; import io.reactivesocket.TestConnection; +import io.reactivesocket.exceptions.InvalidRequestException; import io.reactivesocket.util.PayloadImpl; import io.reactivex.Observable; import io.reactivex.subjects.ReplaySubject; @@ -165,7 +166,7 @@ public void testRequestResponseError() throws InterruptedException { conn.toInput.send(Frame.Error.from(2, new RuntimeException("Failed"))); ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); - ts.assertError(Exception.class); + ts.assertError(InvalidRequestException.class); assertEquals("Failed", ts.errors().get(0).getMessage()); } @@ -313,7 +314,7 @@ public void testRequestStreamError() throws InterruptedException { conn.toInput.send(utf8EncodedErrorFrame(2, "Failure")); ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); - ts.assertError(Exception.class); + ts.assertError(InvalidRequestException.class); ts.assertValue(utf8EncodedPayload("hello", null)); assertEquals("Failure", ts.errors().get(0).getMessage()); }