From 338abafc7630a2668013883bafce90e37c1d8d2c Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Fri, 15 Jul 2016 11:00:24 -0700 Subject: [PATCH] Properly propagate exceptions type from server client. ***Problem*** Errors generated from the `Responder` are serialized according to the spec https://github.com/ReactiveSocket/reactivesocket/blob/master/Protocol.md#error-codes but the type is lost when deserializing by the `Requester`. ***Solution*** Instead of generating a RuntimeException containing the string of the error, generate the right Exception type based on the error code. This allow user code, or filter to make smart decision based on the Exception type (Retryable, ...) --- .../io/reactivesocket/exceptions/TransportException.java | 1 - .../src/main/java/io/reactivesocket/internal/Requester.java | 6 ++---- .../src/test/java/io/reactivesocket/LeaseTest.java | 3 ++- .../test/java/io/reactivesocket/internal/RequesterTest.java | 5 +++-- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/exceptions/TransportException.java b/reactivesocket-core/src/main/java/io/reactivesocket/exceptions/TransportException.java index e4e4029a0..5d53c40d8 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/exceptions/TransportException.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/exceptions/TransportException.java @@ -24,5 +24,4 @@ public TransportException(Throwable t) { public synchronized Throwable fillInStackTrace() { return this; } - } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Requester.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Requester.java index 21eef8bcc..9fa027056 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Requester.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Requester.java @@ -15,7 +15,6 @@ */ package io.reactivesocket.internal; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collection; @@ -837,9 +836,8 @@ public void onNext(Frame frame) { cancel(); } else if (type == FrameType.ERROR) { terminated.set(true); - final ByteBuffer byteBuffer = frame.getData(); - String errorMessage = getByteBufferAsString(byteBuffer); - onError(new RuntimeException(errorMessage)); + Throwable throwable = Exceptions.from(frame); + onError(throwable); cancel(); } else { onError(new RuntimeException("Unexpected FrameType: " + frame.getType())); diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/LeaseTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/LeaseTest.java index 1d460b507..20a254a61 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/LeaseTest.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/LeaseTest.java @@ -15,6 +15,7 @@ */ package io.reactivesocket; +import io.reactivesocket.exceptions.RejectedException; import io.reactivesocket.internal.Publishers; import io.reactivesocket.internal.Responder; import org.junit.After; @@ -182,7 +183,7 @@ public void testWriteWithoutLease() throws InterruptedException { TestSubscriber ts2 = new TestSubscriber<>(); response2.subscribe(ts2); ts2.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); - ts2.assertError(RuntimeException.class); + ts2.assertError(RejectedException.class); } @Test(timeout=2000) diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/internal/RequesterTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/internal/RequesterTest.java index 91d45a66d..ba9a28124 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/internal/RequesterTest.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/internal/RequesterTest.java @@ -21,6 +21,7 @@ import io.reactivesocket.LatchedCompletable; import io.reactivesocket.Payload; import io.reactivesocket.TestConnection; +import io.reactivesocket.exceptions.InvalidRequestException; import io.reactivesocket.util.PayloadImpl; import io.reactivex.Observable; import io.reactivex.subjects.ReplaySubject; @@ -165,7 +166,7 @@ public void testRequestResponseError() throws InterruptedException { conn.toInput.send(Frame.Error.from(2, new RuntimeException("Failed"))); ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); - ts.assertError(Exception.class); + ts.assertError(InvalidRequestException.class); assertEquals("Failed", ts.errors().get(0).getMessage()); } @@ -313,7 +314,7 @@ public void testRequestStreamError() throws InterruptedException { conn.toInput.send(utf8EncodedErrorFrame(2, "Failure")); ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); - ts.assertError(Exception.class); + ts.assertError(InvalidRequestException.class); ts.assertValue(utf8EncodedPayload("hello", null)); assertEquals("Failure", ts.errors().get(0).getMessage()); }