Skip to content

Commit

Permalink
Endpoint selection is retried on selection timeout (#4269)
Browse files Browse the repository at this point in the history
Motivation:

- Differentiate the cases where "endpoint is set to `null`" vs. "endpoint selection timed out" for easier debugging.
  - `EmptyEndpointGroupException`: If an `EndpointGroup` sets the endpoint as `null`
  - `EndpointSelectionTimeoutException`: If the `Endpoint` selection timed out.
- Add the capability to retry endpoint selection from an `EndpointGroup` if not selected yet.

Modifications:

- Define a helper class `ClientPendingThrowableUtil` which contains helper methods for setting a new attribute `CLIENT_PENDING_THROWABLE`.
  - This throwable is thrown after decorators are processed, but before the actual client execution starts.
  - This attribute should be removed if a user wants to re-use a derived context.
    - An example of this can be seen at `RetryingClient`. `RetryingClient` works by creating a derived context from the original context, and executing requests. If the original context contains `CLIENT_PENDING_THROWABLE`, then derived contexts would also contain the `CLIENT_PENDING_THROWABLE` and the retried requests would fail immediately. Hence, it is necessary that `CLIENT_PENDING_THROWABLE` be cleared in the derived context.
- Set `CLIENT_PENDING_THROWABLE` to `EndpointSelectionTimeoutException` when an `Endpoint` selection times out.
- Modify `RetryingClient`, `RetryingRpcClient`:
  - Check if the context has an `EndpointGroup` but doesn't have an `Endpoint` yet, which means previous endpoint selection has failed. If an endpoint hasn't been selected yet, call `initContextAndExecuteWithFallback` which tries another `EndpointSelection`.

Result:

- `EndpointSelectionTimeoutException` is thrown on selection timeout
- Retry is done on selection timeout
  • Loading branch information
jrhee17 committed Jul 2, 2022
1 parent cc8e707 commit 6f256cb
Show file tree
Hide file tree
Showing 14 changed files with 687 additions and 18 deletions.
Expand Up @@ -34,6 +34,7 @@
import com.linecorp.armeria.common.logging.ClientConnectionTimingsBuilder;
import com.linecorp.armeria.common.logging.RequestLogBuilder;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil;
import com.linecorp.armeria.internal.common.PathAndQuery;
import com.linecorp.armeria.internal.common.RequestContextUtil;
import com.linecorp.armeria.server.ProxiedAddresses;
Expand All @@ -57,6 +58,11 @@ final class HttpClientDelegate implements HttpClient {

@Override
public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Exception {
final Throwable throwable = ClientPendingThrowableUtil.pendingThrowable(ctx);
if (throwable != null) {
return earlyFailedResponse(throwable, ctx, req);
}

final Endpoint endpoint = ctx.endpoint();
if (endpoint == null) {
// It is possible that we reach here even when `EndpointGroup` is not empty,
Expand All @@ -69,17 +75,11 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex
// and response created here will be exposed only when `EndpointGroup.select()` returned `null`.
//
// See `DefaultClientRequestContext.init()` for more information.
final UnprocessedRequestException cause =
UnprocessedRequestException.of(EmptyEndpointGroupException.get(ctx.endpointGroup()));
handleEarlyRequestException(ctx, req, cause);
return HttpResponse.ofFailure(cause);
return earlyFailedResponse(EmptyEndpointGroupException.get(ctx.endpointGroup()), ctx, req);
}

if (!isValidPath(req)) {
final UnprocessedRequestException cause = UnprocessedRequestException.of(
new IllegalArgumentException("invalid path: " + req.path()));
handleEarlyRequestException(ctx, req, cause);
return HttpResponse.ofFailure(cause);
return earlyFailedResponse(new IllegalArgumentException("invalid path: " + req.path()), ctx, req);
}

final SessionProtocol protocol = ctx.sessionProtocol();
Expand Down Expand Up @@ -222,6 +222,12 @@ private static boolean isValidPath(HttpRequest req) {
return PathAndQuery.parse(req.path()) != null;
}

private static HttpResponse earlyFailedResponse(Throwable t, ClientRequestContext ctx, HttpRequest req) {
final UnprocessedRequestException cause = UnprocessedRequestException.of(t);
handleEarlyRequestException(ctx, req, cause);
return HttpResponse.ofFailure(cause);
}

private static void handleEarlyRequestException(ClientRequestContext ctx,
HttpRequest req, Throwable cause) {
try (SafeCloseable ignored = RequestContextUtil.pop()) {
Expand Down
Expand Up @@ -29,6 +29,7 @@
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.UnmodifiableFuture;
import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil;

/**
* A skeletal {@link EndpointSelector} implementation. This abstract class implements the
Expand Down Expand Up @@ -75,8 +76,17 @@ public final CompletableFuture<Endpoint> select(ClientRequestContext ctx,

// Schedule the timeout task.
final ScheduledFuture<?> timeoutFuture =
executor.schedule(() -> listeningFuture.complete(null),
timeoutMillis, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
final EndpointSelectionTimeoutException ex =
EndpointSelectionTimeoutException.get(endpointGroup,
timeoutMillis);
ClientPendingThrowableUtil.setPendingThrowable(ctx, ex);
// Don't complete exceptionally so that the throwable
// can be handled after executing the attached decorators
listeningFuture.complete(null);
},
timeoutMillis,
TimeUnit.MILLISECONDS);
listeningFuture.timeoutFuture = timeoutFuture;

// Cancel the timeout task if listeningFuture is done already.
Expand Down
@@ -0,0 +1,56 @@
/*
* Copyright 2022 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.client.endpoint;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.annotation.UnstableApi;

/**
* An {@link EndpointGroupException} raised when an {@link EndpointGroup} fails to resolve
* an {@link Endpoint} within a configured selection timeout.
*/
@UnstableApi
public final class EndpointSelectionTimeoutException extends EndpointGroupException {

private static final long serialVersionUID = -3079582212067997365L;

private static final EndpointSelectionTimeoutException INSTANCE =
new EndpointSelectionTimeoutException();

/**
* Returns an {@link EndpointSelectionTimeoutException} which prints a message about
* the {@link EndpointGroup} when thrown.
*/
public static EndpointSelectionTimeoutException get(EndpointGroup endpointGroup,
long selectionTimeoutMillis) {
requireNonNull(endpointGroup, "endpointGroup");
checkArgument(selectionTimeoutMillis >= 0, "selectionTimeoutMillis: %s (expected: >= 0)",
selectionTimeoutMillis);
return Flags.verboseExceptionSampler().isSampled(EndpointSelectionTimeoutException.class) ?
new EndpointSelectionTimeoutException(endpointGroup, selectionTimeoutMillis) : INSTANCE;
}

private EndpointSelectionTimeoutException() {}

private EndpointSelectionTimeoutException(EndpointGroup endpointGroup, long selectionTimeoutMillis) {
super("Failed to select within " + selectionTimeoutMillis + " ms an endpoint from: " + endpointGroup);
}
}
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.linecorp.armeria.internal.client.ClientUtil.executeWithFallback;
import static com.linecorp.armeria.internal.client.ClientUtil.initContextAndExecuteWithFallback;

import java.time.Duration;
import java.util.Date;
Expand All @@ -29,8 +30,10 @@
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.DefaultClientRequestContext;
import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.client.ResponseTimeoutException;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpRequest;
Expand All @@ -45,6 +48,7 @@
import com.linecorp.armeria.common.logging.RequestLogProperty;
import com.linecorp.armeria.common.stream.AbortedStreamException;
import com.linecorp.armeria.internal.client.AggregatedHttpRequestDuplicator;
import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil;
import com.linecorp.armeria.internal.client.TruncatingHttpResponse;

import io.netty.handler.codec.DateFormatter;
Expand Down Expand Up @@ -303,8 +307,20 @@ private void doExecute0(ClientRequestContext ctx, HttpRequestDuplicator rootReqD
return;
}

final HttpResponse response = executeWithFallback(unwrap(), derivedCtx,
(context, cause) -> HttpResponse.ofFailure(cause));
final HttpResponse response;
final EndpointGroup endpointGroup = derivedCtx.endpointGroup();
if (!initialAttempt && derivedCtx instanceof DefaultClientRequestContext &&
endpointGroup != null && derivedCtx.endpoint() == null) {
// clear the pending throwable to retry endpoint selection
ClientPendingThrowableUtil.removePendingThrowable(derivedCtx);
// if the endpoint hasn't been selected, try to initialize the ctx with a new endpoint/event loop
final DefaultClientRequestContext casted = (DefaultClientRequestContext) derivedCtx;
response = initContextAndExecuteWithFallback(unwrap(), casted, endpointGroup, HttpResponse::from,
(context, cause) -> HttpResponse.ofFailure(cause));
} else {
response = executeWithFallback(unwrap(), derivedCtx,
(context, cause) -> HttpResponse.ofFailure(cause));
}

final RetryConfig<HttpResponse> config = mapping().get(ctx, duplicateReq);
if (!ctx.exchangeType().isResponseStreaming() || config.requiresResponseTrailers()) {
Expand Down
Expand Up @@ -16,18 +16,22 @@
package com.linecorp.armeria.client.retry;

import static com.linecorp.armeria.internal.client.ClientUtil.executeWithFallback;
import static com.linecorp.armeria.internal.client.ClientUtil.initContextAndExecuteWithFallback;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.DefaultClientRequestContext;
import com.linecorp.armeria.client.ResponseTimeoutException;
import com.linecorp.armeria.client.RpcClient;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.Request;
import com.linecorp.armeria.common.RpcRequest;
import com.linecorp.armeria.common.RpcResponse;
import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil;
import com.linecorp.armeria.internal.common.util.StringUtil;

/**
Expand Down Expand Up @@ -166,8 +170,21 @@ private void doExecute0(ClientRequestContext ctx, RpcRequest req,
mutator -> mutator.add(ARMERIA_RETRY_COUNT, StringUtil.toString(totalAttempts - 1)));
}

final RpcResponse res = executeWithFallback(unwrap(), derivedCtx,
final RpcResponse res;

final EndpointGroup endpointGroup = derivedCtx.endpointGroup();
if (!initialAttempt && derivedCtx instanceof DefaultClientRequestContext &&
endpointGroup != null && derivedCtx.endpoint() == null) {
// clear the pending throwable to retry endpoint selection
ClientPendingThrowableUtil.removePendingThrowable(derivedCtx);
// if the endpoint hasn't been selected, try to initialize the ctx with a new endpoint/event loop
final DefaultClientRequestContext casted = (DefaultClientRequestContext) derivedCtx;
res = initContextAndExecuteWithFallback(unwrap(), casted, endpointGroup, RpcResponse::from,
(context, cause) -> RpcResponse.ofFailure(cause));
} else {
res = executeWithFallback(unwrap(), derivedCtx,
(context, cause) -> RpcResponse.ofFailure(cause));
}

final RetryConfig<RpcResponse> retryConfig = mapping().get(ctx, req);
final RetryRuleWithContent<RpcResponse> retryRule =
Expand Down
@@ -0,0 +1,116 @@
/*
* Copyright 2022 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.internal.client;

import static java.util.Objects.requireNonNull;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.Exceptions;

import io.netty.util.AttributeKey;

/**
* Sets a pending {@link Throwable} for the specified {@link ClientRequestContext}.
* This throwable will be thrown after all decorators are executed, but before the
* actual client execution starts.
*
* <p>For example:<pre>{@code
* final RuntimeException e = new RuntimeException();
* final WebClient webClient =
* WebClient.builder(SessionProtocol.HTTP, endpointGroup)
* .contextCustomizer(ctx -> setPendingThrowable(ctx, e))
* .decorator(LoggingClient.newDecorator()) // the request is logged
* .build();
* assertThatThrownBy(() -> webClient.blocking().get("/"))
* .isInstanceOf(UnprocessedRequestException.class)
* .hasCause(e);
* }</pre>
*/
public final class ClientPendingThrowableUtil {

private static final AttributeKey<Throwable> CLIENT_PENDING_THROWABLE =
AttributeKey.valueOf(ClientPendingThrowableUtil.class, "CLIENT_PENDING_THROWABLE");

/**
* Sets a pending {@link Throwable} for the specified {@link ClientRequestContext}.
* Note that the throwable will be peeled via {@link Exceptions#peel(Throwable)}
* before being set.
*
* <p>For example:<pre>{@code
* final RuntimeException e = new RuntimeException();
* final CompletionException wrapper = new CompletionException(e);
* final ClientRequestContext ctx = ...
* setPendingThrowable(ctx, wrapper);
* final Throwable throwable = pendingThrowable(ctx);
* assert throwable != null;
* assertThat(throwable).isEqualTo(e);
* }</pre>
*/
public static void setPendingThrowable(ClientRequestContext ctx, Throwable cause) {
requireNonNull(ctx, "ctx");
requireNonNull(cause, "cause");
cause = Exceptions.peel(cause);
ctx.setAttr(CLIENT_PENDING_THROWABLE, cause);
}

/**
* Retrieves the pending {@link Throwable} for the specified {@link ClientRequestContext}.
* Note that the derived context will also contain this attribute by default, and can fail
* requests immediately. The pending {@link Throwable} can be removed by
* {@link #removePendingThrowable(ClientRequestContext)}.
*
* <p>For example:<pre>{@code
* ClientRequestContext ctx = ...;
* Throwable t = ...;
* final Throwable t1 = pendingThrowable(ctx);
* assert t1 == null;
* setPendingThrowable(ctx, t);
* final Throwable t2 = pendingThrowable(ctx);
* assert t2 == t;
* final ClientRequestContext derived = ctx.newDerivedContext(id, req, rpcReq, endpoint);
* final Throwable t3 = pendingThrowable(derived);
* assert t3 == t;
* }</pre>
*/
@Nullable
public static Throwable pendingThrowable(ClientRequestContext ctx) {
requireNonNull(ctx, "ctx");
return ctx.attr(CLIENT_PENDING_THROWABLE);
}

/**
* Removes the pending throwable set by {@link #setPendingThrowable(ClientRequestContext, Throwable)}.
*
* <p>For example:<pre>{@code
* final Throwable throwable = pendingThrowable(ctx);
* assert throwable != null;
* assertThat(throwable).isEqualTo(e);
* removePendingThrowable(ctx);
* assertThat(pendingThrowable(ctx)).isNull();
* }</pre>
*/
public static void removePendingThrowable(ClientRequestContext ctx) {
requireNonNull(ctx, "ctx");
if (!ctx.hasAttr(CLIENT_PENDING_THROWABLE)) {
return;
}
ctx.setAttr(CLIENT_PENDING_THROWABLE, null);
}

private ClientPendingThrowableUtil() {}
}

0 comments on commit 6f256cb

Please sign in to comment.