From d3b75246f1c16e800fa30c082a9a5b70533f805d Mon Sep 17 00:00:00 2001 From: xintwu Date: Wed, 22 May 2024 18:30:14 -0700 Subject: [PATCH 1/6] Return RetriableRequestException for Netty Max Active Stream error --- .../r2/transport/http/common/HttpBridge.java | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/r2-core/src/main/java/com/linkedin/r2/transport/http/common/HttpBridge.java b/r2-core/src/main/java/com/linkedin/r2/transport/http/common/HttpBridge.java index 2f9e3632da..e180a491f7 100644 --- a/r2-core/src/main/java/com/linkedin/r2/transport/http/common/HttpBridge.java +++ b/r2-core/src/main/java/com/linkedin/r2/transport/http/common/HttpBridge.java @@ -19,6 +19,7 @@ import com.linkedin.r2.RemoteInvocationException; +import com.linkedin.r2.RetriableRequestException; import com.linkedin.r2.message.Request; import com.linkedin.r2.message.rest.RestException; import com.linkedin.r2.message.rest.RestRequest; @@ -42,6 +43,9 @@ */ public class HttpBridge { + private static final String NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE = + "Maximum active streams violated for this endpoint"; + /** * Wrap application callback for incoming RestResponse with a "generic" HTTP callback. * @@ -149,11 +153,21 @@ public void onResponse(TransportResponse response) { if (response.hasError()) { - response = - TransportResponseImpl.error(new RemoteInvocationException("Failed to get response from server for URI " - + uri, - response.getError()), - response.getWireAttributes()); + Throwable responseError = response.getError(); + // If the error is due to the netty max active stream error, return a RetriableRequestException + if (shouldReturnRetriableRequestException(responseError)) + { + StreamException streamException = (StreamException) responseError; + response = TransportResponseImpl.error( + new RetriableRequestException("Failed to get response from server for URI " + uri, streamException), + response.getWireAttributes()); + } + else + { + response = TransportResponseImpl.error( + new RemoteInvocationException("Failed to get response from server for URI " + uri, responseError), + response.getWireAttributes()); + } } else if (!RestStatus.isOK(response.getResponse().getStatus())) { @@ -209,6 +223,12 @@ public void onResponse(TransportResponse response) }; } + private static boolean shouldReturnRetriableRequestException(Throwable responseError) + { + return responseError instanceof StreamException && responseError.getMessage() + .contains(NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE); + } + /** * Gets the URI to display in exception messages. The query parameters part of the URI is omitted to prevent * displaying sensitive information. From 0f01f2c11b7effc5718b8422db49ccff0fcdb00d Mon Sep 17 00:00:00 2001 From: xintwu Date: Thu, 23 May 2024 14:51:10 -0700 Subject: [PATCH 2/6] Use Netty H2 Exception in condition checking. Add unit test. --- r2-core/build.gradle | 1 + .../r2/transport/http/common/HttpBridge.java | 39 ++++++++++++++++--- .../transport/http/common/TestHttpBridge.java | 35 +++++++++++++++++ 3 files changed, 70 insertions(+), 5 deletions(-) diff --git a/r2-core/build.gradle b/r2-core/build.gradle index 75e0fadaa6..870052b99d 100644 --- a/r2-core/build.gradle +++ b/r2-core/build.gradle @@ -5,6 +5,7 @@ dependencies { compile externalDependency.servletApi compile externalDependency.mail compile externalDependency.javaxActivation + compile externalDependency.netty testCompile project(':r2-testutils') testCompile project(':test-util') testCompile externalDependency.testng diff --git a/r2-core/src/main/java/com/linkedin/r2/transport/http/common/HttpBridge.java b/r2-core/src/main/java/com/linkedin/r2/transport/http/common/HttpBridge.java index e180a491f7..15bea3948a 100644 --- a/r2-core/src/main/java/com/linkedin/r2/transport/http/common/HttpBridge.java +++ b/r2-core/src/main/java/com/linkedin/r2/transport/http/common/HttpBridge.java @@ -33,6 +33,7 @@ import com.linkedin.r2.transport.common.bridge.common.TransportResponse; import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl; +import io.netty.handler.codec.http2.Http2Exception; import java.net.URI; import java.net.URISyntaxException; import java.util.Map; @@ -43,7 +44,7 @@ */ public class HttpBridge { - private static final String NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE = + public static final String NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE = "Maximum active streams violated for this endpoint"; /** @@ -65,6 +66,10 @@ public void onResponse(TransportResponse response) { if (response.hasError()) { + Throwable responseError = response.getError(); + // If the error is due to the netty max active stream error, wrap it with RetriableRequestException instead + RemoteInvocationException exception = + wrapResponseError("Failed to get response from server for URI " + uri, responseError); response = TransportResponseImpl.error(new RemoteInvocationException("Failed to get response from server for URI " + uri, @@ -136,6 +141,8 @@ public void onResponse(TransportResponse response) /** * Wrap application callback for incoming StreamResponse with a "generic" HTTP callback. + * If callback returns the error which is in Netty Http2Exception.StreamException type, + * populate RetriableRequestException instead of RemoteInvocationException. * * @param callback the callback to receive the incoming RestResponse * @param request the request, used only to provide useful context in case an error @@ -157,9 +164,8 @@ public void onResponse(TransportResponse response) // If the error is due to the netty max active stream error, return a RetriableRequestException if (shouldReturnRetriableRequestException(responseError)) { - StreamException streamException = (StreamException) responseError; response = TransportResponseImpl.error( - new RetriableRequestException("Failed to get response from server for URI " + uri, streamException), + new RetriableRequestException("Failed to get response from server for URI " + uri, responseError), response.getWireAttributes()); } else @@ -223,10 +229,33 @@ public void onResponse(TransportResponse response) }; } + /** + * Check if the error is due to the netty max active stream error. + * @param responseError Throwable error to check + * @return True if the error is due to the netty max active stream error, false otherwise + */ private static boolean shouldReturnRetriableRequestException(Throwable responseError) { - return responseError instanceof StreamException && responseError.getMessage() - .contains(NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE); + return responseError instanceof Http2Exception.StreamException + && responseError.getMessage().contains(NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE); + } + + /** + * Wrap the response error with the appropriate exception type. + * If the error is due to the netty max active stream, wrap it with RetriableRequestException. + * @param errorMessage Error message to wrap + * @param responseError Throwable error to wrap + * @return RemoteInvocationException or RetriableRequestException + */ + private static RemoteInvocationException wrapResponseError(String errorMessage, Throwable responseError) { + if (shouldReturnRetriableRequestException(responseError)) + { + return new RetriableRequestException(errorMessage, responseError); + } + else + { + return new RemoteInvocationException(errorMessage, responseError); + } } /** diff --git a/r2-core/src/test/java/test/r2/transport/http/common/TestHttpBridge.java b/r2-core/src/test/java/test/r2/transport/http/common/TestHttpBridge.java index bbc7b23920..589d7dea54 100644 --- a/r2-core/src/test/java/test/r2/transport/http/common/TestHttpBridge.java +++ b/r2-core/src/test/java/test/r2/transport/http/common/TestHttpBridge.java @@ -20,6 +20,9 @@ package test.r2.transport.http.common; +import com.linkedin.r2.RetriableRequestException; +import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2Exception; import java.net.URI; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -43,6 +46,9 @@ import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl; import com.linkedin.r2.transport.http.common.HttpBridge; +import static com.linkedin.r2.transport.http.common.HttpBridge.NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE; + + /** * @author Steven Ihde * @version $Revision: $ @@ -50,6 +56,8 @@ public class TestHttpBridge { + private static final int REGULAR_STREAM_ID = 2; // Can not be 0 or 1 as they are reserved in Netty + @Test public void testRestToHttpErrorMessage() throws TimeoutException, InterruptedException { @@ -144,4 +152,31 @@ public void testHttpToStreamErrorMessage() throws TimeoutException, InterruptedE // propagating the actual exception Assert.assertSame(resp, streamResponse); } + + @Test + public void testStreamToHttpWithRetriableRequestException() throws TimeoutException, InterruptedException + { + URI uri = URI.create("http://some.host"); + + RestRequest r = new RestRequestBuilder(uri).build(); + + FutureCallback futureCallback = new FutureCallback<>(); + TransportCallback callback = new TransportCallbackAdapter<>(futureCallback); + TransportCallback bridgeCallback = HttpBridge.streamToHttpCallback(callback, + Messages.toStreamRequest(r)); + + bridgeCallback.onResponse(TransportResponseImpl.error( + Http2Exception.streamError(REGULAR_STREAM_ID, Http2Error.REFUSED_STREAM, + NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE + ": 200"))); + + try + { + futureCallback.get(30, TimeUnit.SECONDS); + Assert.fail("get should have thrown exception"); + } + catch (ExecutionException e) + { + Assert.assertTrue(e.getCause() instanceof RetriableRequestException); + } + } } From 294a06adfca2f5444372d15cfb81c56122242656 Mon Sep 17 00:00:00 2001 From: xintwu Date: Thu, 23 May 2024 15:09:28 -0700 Subject: [PATCH 3/6] Refactor code --- .../r2/transport/http/common/HttpBridge.java | 22 +++++-------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/r2-core/src/main/java/com/linkedin/r2/transport/http/common/HttpBridge.java b/r2-core/src/main/java/com/linkedin/r2/transport/http/common/HttpBridge.java index 15bea3948a..f9392e8e0a 100644 --- a/r2-core/src/main/java/com/linkedin/r2/transport/http/common/HttpBridge.java +++ b/r2-core/src/main/java/com/linkedin/r2/transport/http/common/HttpBridge.java @@ -66,10 +66,6 @@ public void onResponse(TransportResponse response) { if (response.hasError()) { - Throwable responseError = response.getError(); - // If the error is due to the netty max active stream error, wrap it with RetriableRequestException instead - RemoteInvocationException exception = - wrapResponseError("Failed to get response from server for URI " + uri, responseError); response = TransportResponseImpl.error(new RemoteInvocationException("Failed to get response from server for URI " + uri, @@ -161,19 +157,11 @@ public void onResponse(TransportResponse response) if (response.hasError()) { Throwable responseError = response.getError(); - // If the error is due to the netty max active stream error, return a RetriableRequestException - if (shouldReturnRetriableRequestException(responseError)) - { - response = TransportResponseImpl.error( - new RetriableRequestException("Failed to get response from server for URI " + uri, responseError), - response.getWireAttributes()); - } - else - { - response = TransportResponseImpl.error( - new RemoteInvocationException("Failed to get response from server for URI " + uri, responseError), - response.getWireAttributes()); - } + // If the error is due to the netty max active stream error, wrap it with RetriableRequestException instead + RemoteInvocationException exception = + wrapResponseError("Failed to get response from server for URI " + uri, responseError); + response = + TransportResponseImpl.error(exception, response.getWireAttributes()); } else if (!RestStatus.isOK(response.getResponse().getStatus())) { From e0a4590ca1454ecc27e9754dcfd167ee4cbb3195 Mon Sep 17 00:00:00 2001 From: xintwu Date: Thu, 23 May 2024 15:36:26 -0700 Subject: [PATCH 4/6] Update changelog & bump version --- CHANGELOG.md | 6 +++++- gradle.properties | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 53f1f4bdcb..0809448634 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.55.0] - 2024-05-23 +- Allow HttpBridge to return RetriableRequestException for the Netty max active stream error + ## [29.54.0] - 2024-05-08 - Dual read monitors cluster uris similarity @@ -5689,7 +5692,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.54.0...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.55.0...master +[29.55.0]: https://github.com/linkedin/rest.li/compare/v29.54.0...v29.55.0 [29.54.0]: https://github.com/linkedin/rest.li/compare/v29.53.1...v29.54.0 [29.53.1]: https://github.com/linkedin/rest.li/compare/v29.53.0...v29.53.1 [29.53.0]: https://github.com/linkedin/rest.li/compare/v29.52.1...v29.53.0 diff --git a/gradle.properties b/gradle.properties index 61d65c82df..f377af2b28 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.54.0 +version=29.55.0 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true From ca023a7da41d7005b479791278013a9966708caf Mon Sep 17 00:00:00 2001 From: xintwu Date: Thu, 23 May 2024 16:02:33 -0700 Subject: [PATCH 5/6] Update version for testing --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index f377af2b28..299cc734b5 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.55.0 +version=29.55.0-rc.1 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true From c26002016b7d79609f224b26dc417385f8d9e7fd Mon Sep 17 00:00:00 2001 From: xintwu Date: Fri, 24 May 2024 13:53:15 -0700 Subject: [PATCH 6/6] Revert the release-candidate version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 299cc734b5..f377af2b28 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.55.0-rc.1 +version=29.55.0 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true