Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return RetriableRequestException for Netty Max Active Stream error #1001

Merged
merged 6 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.55.0] - 2024-05-23
- Allow HttpBridge to return RetriableRequestException for the Netty max active stream error

## [29.54.0] - 2024-05-08
- Dual read monitors cluster uris similarity

Expand Down Expand Up @@ -5689,7 +5692,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.54.0...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.55.0...master
[29.55.0]: https://github.com/linkedin/rest.li/compare/v29.54.0...v29.55.0
[29.54.0]: https://github.com/linkedin/rest.li/compare/v29.53.1...v29.54.0
[29.53.1]: https://github.com/linkedin/rest.li/compare/v29.53.0...v29.53.1
[29.53.0]: https://github.com/linkedin/rest.li/compare/v29.52.1...v29.53.0
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.54.0
version=29.55.0
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down
1 change: 1 addition & 0 deletions r2-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dependencies {
compile externalDependency.servletApi
compile externalDependency.mail
compile externalDependency.javaxActivation
compile externalDependency.netty
testCompile project(':r2-testutils')
testCompile project(':test-util')
testCompile externalDependency.testng
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.r2.RetriableRequestException;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.rest.RestException;
import com.linkedin.r2.message.rest.RestRequest;
Expand All @@ -32,6 +33,7 @@
import com.linkedin.r2.transport.common.bridge.common.TransportResponse;
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;

import io.netty.handler.codec.http2.Http2Exception;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
Expand All @@ -42,6 +44,9 @@
*/
public class HttpBridge
{
public static final String NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE =
"Maximum active streams violated for this endpoint";

/**
* Wrap application callback for incoming RestResponse with a "generic" HTTP callback.
*
Expand Down Expand Up @@ -132,6 +137,8 @@ public void onResponse(TransportResponse<RestResponse> response)

/**
* Wrap application callback for incoming StreamResponse with a "generic" HTTP callback.
* If callback returns the error which is in Netty Http2Exception.StreamException type,
* populate RetriableRequestException instead of RemoteInvocationException.
*
* @param callback the callback to receive the incoming RestResponse
* @param request the request, used only to provide useful context in case an error
Expand All @@ -149,11 +156,12 @@ public void onResponse(TransportResponse<StreamResponse> response)
{
if (response.hasError())
{
Throwable responseError = response.getError();
// If the error is due to the netty max active stream error, wrap it with RetriableRequestException instead
RemoteInvocationException exception =
wrapResponseError("Failed to get response from server for URI " + uri, responseError);
response =
TransportResponseImpl.error(new RemoteInvocationException("Failed to get response from server for URI "
+ uri,
response.getError()),
response.getWireAttributes());
TransportResponseImpl.error(exception, response.getWireAttributes());
}
else if (!RestStatus.isOK(response.getResponse().getStatus()))
{
Expand Down Expand Up @@ -209,6 +217,35 @@ public void onResponse(TransportResponse<StreamResponse> response)
};
}

/**
* Check if the error is due to the netty max active stream error.
* @param responseError Throwable error to check
* @return True if the error is due to the netty max active stream error, false otherwise
*/
private static boolean shouldReturnRetriableRequestException(Throwable responseError)
{
return responseError instanceof Http2Exception.StreamException
&& responseError.getMessage().contains(NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE);
}

/**
* Wrap the response error with the appropriate exception type.
* If the error is due to the netty max active stream, wrap it with RetriableRequestException.
* @param errorMessage Error message to wrap
* @param responseError Throwable error to wrap
* @return RemoteInvocationException or RetriableRequestException
*/
private static RemoteInvocationException wrapResponseError(String errorMessage, Throwable responseError) {
if (shouldReturnRetriableRequestException(responseError))
{
return new RetriableRequestException(errorMessage, responseError);
}
else
{
return new RemoteInvocationException(errorMessage, responseError);
}
}

/**
* Gets the URI to display in exception messages. The query parameters part of the URI is omitted to prevent
* displaying sensitive information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

package test.r2.transport.http.common;

import com.linkedin.r2.RetriableRequestException;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import java.net.URI;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -43,13 +46,18 @@
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;
import com.linkedin.r2.transport.http.common.HttpBridge;

import static com.linkedin.r2.transport.http.common.HttpBridge.NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE;


/**
* @author Steven Ihde
* @version $Revision: $
*/

public class TestHttpBridge
{
private static final int REGULAR_STREAM_ID = 2; // Can not be 0 or 1 as they are reserved in Netty

@Test
public void testRestToHttpErrorMessage() throws TimeoutException, InterruptedException
{
Expand Down Expand Up @@ -144,4 +152,31 @@ public void testHttpToStreamErrorMessage() throws TimeoutException, InterruptedE
// propagating the actual exception
Assert.assertSame(resp, streamResponse);
}

@Test
public void testStreamToHttpWithRetriableRequestException() throws TimeoutException, InterruptedException
{
URI uri = URI.create("http://some.host");

RestRequest r = new RestRequestBuilder(uri).build();

FutureCallback<StreamResponse> futureCallback = new FutureCallback<>();
TransportCallback<StreamResponse> callback = new TransportCallbackAdapter<>(futureCallback);
TransportCallback<StreamResponse> bridgeCallback = HttpBridge.streamToHttpCallback(callback,
Messages.toStreamRequest(r));

bridgeCallback.onResponse(TransportResponseImpl.<StreamResponse>error(
Http2Exception.streamError(REGULAR_STREAM_ID, Http2Error.REFUSED_STREAM,
NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE + ": 200")));

try
{
futureCallback.get(30, TimeUnit.SECONDS);
Assert.fail("get should have thrown exception");
}
catch (ExecutionException e)
{
Assert.assertTrue(e.getCause() instanceof RetriableRequestException);
}
}
}