Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GrpcRetryer now retries underlying DEADLINE_EXCEEDED if the root gRPC context deadline is not expired #709

Merged
merged 2 commits into from
Sep 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public interface PayloadConverter {

/**
* Each {@link PayloadConverter} has an Encoding Type that it handles. Each {@link
* PayloadConverter} should add the information about its Encoding Type into the {@link Payload} it
* produces inside {@link #toData(Object)} by associating it with the {@link
* PayloadConverter} should add the information about its Encoding Type into the {@link Payload}
* it produces inside {@link #toData(Object)} by associating it with the {@link
* EncodingKeys#METADATA_ENCODING_KEY} key attached to the {@code Payload}'s Metadata using {@link
* Payload.Builder#putMetadata(String, ByteString)}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public static Header toHeaderGrpc(
}

/**
* Converts a {@code Map<String, Object>} into a {@code Map<String, Payload>} by applying specified converter on
* each value. This util should be used for things like search attributes and memo that need to be
* converted back from bytes on the server.
* Converts a {@code Map<String, Object>} into a {@code Map<String, Payload>} by applying
* specified converter on each value. This util should be used for things like search attributes
* and memo that need to be converted back from bytes on the server.
*/
public static Map<String, Payload> intoPayloadMap(
DataConverter converter, Map<String, Object> map) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.temporal.internal;

import io.grpc.Context;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -110,7 +111,9 @@ public CompletableFuture<Void> throttle() {
long delay = calculateSleepTime();
@SuppressWarnings({"FutureReturnValueIgnored", "unused"})
ScheduledFuture<?> ignored =
executor.schedule(() -> result.complete(null), delay, TimeUnit.MILLISECONDS);
executor.schedule(
// preserving gRPC context between threads
Context.current().wrap(() -> result.complete(null)), delay, TimeUnit.MILLISECONDS);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package io.temporal.internal.retryer;

import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.StatusRuntimeException;
import io.temporal.internal.AsyncBackoffThrottler;
import io.temporal.serviceclient.RpcRetryOptions;
Expand Down Expand Up @@ -46,8 +48,9 @@ public <R> CompletableFuture<R> retry(
options.getInitialInterval(),
options.getMaximumInterval(),
options.getBackoffCoefficient());
options.validate();
CompletableFuture<R> resultCF = new CompletableFuture<>();
retry(options, function, 1, startTime, null, throttler, resultCF);
retry(options, function, 1, startTime, throttler, null, resultCF);
return resultCF;
}

Expand All @@ -56,10 +59,9 @@ private <R> void retry(
Supplier<CompletableFuture<R>> function,
int attempt,
long startTime,
StatusRuntimeException previousException,
AsyncBackoffThrottler throttler,
StatusRuntimeException previousException,
CompletableFuture<R> resultCF) {
options.validate();
throttler
.throttle()
.thenAccept(
Expand Down Expand Up @@ -137,21 +139,24 @@ private <R> void failOrRetry(

StatusRuntimeException statusRuntimeException = (StatusRuntimeException) currentException;

Deadline grpcContextDeadline = Context.current().getDeadline();
RuntimeException finalException =
GrpcRetryerUtils.createFinalExceptionIfNotRetryable(
statusRuntimeException, previousException, options);
statusRuntimeException, previousException, options, grpcContextDeadline);
if (finalException != null) {
resultCF.completeExceptionally(finalException);
return;
}

if (GrpcRetryerUtils.ranOutOfRetries(options, startTime, clock.millis(), attempt)) {
if (GrpcRetryerUtils.ranOutOfRetries(
options, startTime, clock.millis(), attempt, grpcContextDeadline)) {
resultCF.completeExceptionally(statusRuntimeException);
return;
}

log.debug("Retrying after failure", currentException);
retry(options, function, attempt + 1, startTime, statusRuntimeException, throttler, resultCF);

retry(options, function, attempt + 1, startTime, throttler, statusRuntimeException, resultCF);
}

private static Throwable unwrapCompletionException(Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.temporal.internal.retryer;

import io.grpc.Deadline;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.serviceclient.RpcRetryOptions;
Expand All @@ -37,14 +38,15 @@ class GrpcRetryerUtils {
* @param previousException previous exception happened before this one, {@code null} if {@code
* currentException} is the first exception in the chain
* @param options retry options
* @param grpcContextDeadline current grpc context deadline
* @return null if the {@code exception} can be retried, a final exception to throw in the
* external code otherwise. In case {@code previousException} is {@link
* io.grpc.Status#DEADLINE_EXCEEDED}, we will prefer the previous exception if it's present.
* external code otherwise.
*/
static @Nullable RuntimeException createFinalExceptionIfNotRetryable(
@Nonnull StatusRuntimeException currentException,
@Nullable StatusRuntimeException previousException,
@Nonnull RpcRetryOptions options) {
@Nonnull RpcRetryOptions options,
@Nullable Deadline grpcContextDeadline) {
Status.Code code = currentException.getStatus().getCode();

switch (code) {
Expand All @@ -54,7 +56,17 @@ class GrpcRetryerUtils {
case CANCELLED:
return new CancellationException();
case DEADLINE_EXCEEDED:
return previousException != null ? previousException : currentException;
if (grpcContextDeadline != null && grpcContextDeadline.isExpired()) {
// If our higher level GRPC context deadline is expired,
// the underlying DEADLINE_EXCEEDED is likely meaningless, and
// we try to preserve the previous exception if it's present
return previousException != null ? previousException : currentException;
} else {
// If this specific request's deadline has expired, but the higher-level deadline
// that was established when GrpcRetryer was initialized not been exceeded, we
// should keep retrying.
break;
}
default:
for (RpcRetryOptions.DoNotRetryItem pair : options.getDoNotRetry()) {
if (pair.getCode() == code
Expand All @@ -64,6 +76,7 @@ class GrpcRetryerUtils {
}
}
}

return null;
}

Expand All @@ -75,12 +88,17 @@ class GrpcRetryerUtils {
* @return true if we out of attempts or time to retry
*/
static boolean ranOutOfRetries(
RpcRetryOptions options, long startTimeMs, long currentTimeMillis, int attempt) {
RpcRetryOptions options,
long startTimeMs,
long currentTimeMillis,
int attempt,
@Nullable Deadline grpcContextDeadline) {
int maxAttempts = options.getMaximumAttempts();
Duration expirationDuration = options.getExpiration();
long expirationInterval = expirationDuration != null ? expirationDuration.toMillis() : 0;

return (maxAttempts > 0 && attempt >= maxAttempts)
|| (expirationDuration != null && currentTimeMillis - startTimeMs >= expirationInterval);
|| (expirationDuration != null && currentTimeMillis - startTimeMs >= expirationInterval)
|| (grpcContextDeadline != null && grpcContextDeadline.isExpired());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package io.temporal.internal.retryer;

import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.StatusRuntimeException;
import io.temporal.internal.BackoffThrottler;
import io.temporal.serviceclient.RpcRetryOptions;
Expand All @@ -45,11 +47,11 @@ public <R, T extends Throwable> R retry(
options.getInitialInterval(),
options.getMaximumInterval(),
options.getBackoffCoefficient());
Deadline grpcContextDeadline = Context.current().getDeadline();

StatusRuntimeException lastException = null;
while (!GrpcRetryerUtils.ranOutOfRetries(options, startTime, clock.millis(), attempt)) {
do {
attempt++;

if (lastException != null) {
log.warn("Retrying after failure", lastException);
}
Expand All @@ -64,7 +66,8 @@ public <R, T extends Throwable> R retry(
throw new CancellationException();
} catch (StatusRuntimeException e) {
RuntimeException finalException =
GrpcRetryerUtils.createFinalExceptionIfNotRetryable(e, lastException, options);
GrpcRetryerUtils.createFinalExceptionIfNotRetryable(
e, lastException, options, grpcContextDeadline);
if (finalException != null) {
throw finalException;
}
Expand All @@ -74,7 +77,8 @@ public <R, T extends Throwable> R retry(
// It's designed this way because it's GrpcRetryer, not general purpose retryer.

throttler.failure();
}
} while (!GrpcRetryerUtils.ranOutOfRetries(
options, startTime, clock.millis(), attempt, grpcContextDeadline));

rethrow(lastException);
throw new IllegalStateException("unreachable");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.temporal.serviceclient;

import com.google.common.base.Preconditions;
import com.uber.m3.tally.NoopScope;
import com.uber.m3.tally.Scope;
import io.grpc.*;
Expand Down Expand Up @@ -445,10 +446,19 @@ public Builder setRpcTimeout(Duration timeout) {

/**
* Sets the rpc timeout value for the following long poll based operations:
* PollWorkflowTaskQueue, PollActivityTaskQueue, GetWorkflowExecutionHistory. Should never be
* below 60 seconds as this is server side timeout for the long poll. Default is 70 seconds.
* PollWorkflowTaskQueue, PollActivityTaskQueue, GetWorkflowExecutionHistory.
*
* <p>Server side timeout for the long poll is 60s. This parameter should never be below 70
* seconds (server timeout + additional delay). Default is 70 seconds.
*
* @throws IllegalArgumentException if {@code timeout} is less than 70s
* @deprecated exposing of this option for users configuration deemed non-beneficial and
* dangerous
*/
@Deprecated
public Builder setRpcLongPollTimeout(Duration timeout) {
Preconditions.checkArgument(
timeout.toMillis() > 70_000, "rpcLongPollTimeout has to be longer 70s");
Comment on lines +460 to +461
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're already deprecating here I dunno that we need to potentially cause people pain by throwing here if they're already setting it too low.

I can go either way here though so if you want to keep it go for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting less than 70s here now is just dangerous. So, I think it's the best we can do here.

this.rpcLongPollTimeout = Objects.requireNonNull(timeout);
return this;
}
Expand Down
Loading