From ef1ab362ebe5fed96319d79a58e4556b060d2a22 Mon Sep 17 00:00:00 2001 From: Dmitry Spikhalskiy Date: Wed, 12 Jan 2022 18:44:30 -0500 Subject: [PATCH] Fix WorkflowServiceStubsOptions#rpcRetryOptions wiring RpcRetryOptions implementation now allow #merge with nulls values --- .../java/io/temporal/common/RetryOptions.java | 2 +- .../internal/common/OptionsUtils.java | 0 .../serviceclient/RpcRetryOptions.java | 39 +-- .../WorkflowServiceStubsOptions.java | 222 +++++++++++------- 4 files changed, 153 insertions(+), 110 deletions(-) rename {temporal-sdk => temporal-serviceclient}/src/main/java/io/temporal/internal/common/OptionsUtils.java (100%) diff --git a/temporal-sdk/src/main/java/io/temporal/common/RetryOptions.java b/temporal-sdk/src/main/java/io/temporal/common/RetryOptions.java index 23076870cb..9ca0cab21b 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/RetryOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/common/RetryOptions.java @@ -177,7 +177,7 @@ public Builder setBackoffCoefficient(double backoffCoefficient) { } /** - * When exceeded the amount of attempt, stop. Even if expiration time is not reached.
+ * When exceeded the amount of attempts, stop. Even if expiration time is not reached.
* Default is unlimited. * * @param maximumAttempts Maximum number of attempts. Default will be used if set to {@code 0}. diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/OptionsUtils.java b/temporal-serviceclient/src/main/java/io/temporal/internal/common/OptionsUtils.java similarity index 100% rename from temporal-sdk/src/main/java/io/temporal/internal/common/OptionsUtils.java rename to temporal-serviceclient/src/main/java/io/temporal/internal/common/OptionsUtils.java diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/RpcRetryOptions.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/RpcRetryOptions.java index 993439d9a3..1dad0d60d6 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/RpcRetryOptions.java +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/RpcRetryOptions.java @@ -19,9 +19,9 @@ package io.temporal.serviceclient; -import com.google.common.base.Defaults; import com.google.protobuf.GeneratedMessageV3; import io.grpc.Status; +import io.temporal.internal.common.OptionsUtils; import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions; import java.time.Duration; import java.util.ArrayList; @@ -147,11 +147,13 @@ public Builder setBackoffCoefficient(double backoffCoefficient) { } /** - * Maximum number of attempts. When exceeded the retries stop even if not expired yet. Must be 1 - * or bigger. Default is unlimited. + * When exceeded the amount of attempts, stop. Even if expiration time is not reached.
+ * Default is unlimited. + * + * @param maximumAttempts Maximum number of attempts. Default will be used if set to {@code 0}. */ public Builder setMaximumAttempts(int maximumAttempts) { - if (maximumAttempts < 1) { + if (maximumAttempts < 0) { throw new IllegalArgumentException("Invalid maximumAttempts: " + maximumAttempts); } this.maximumAttempts = maximumAttempts; @@ -160,11 +162,14 @@ public Builder setMaximumAttempts(int maximumAttempts) { /** * Maximum interval between retries. Exponential backoff leads to interval increase. This value - * is the cap of the increase. Default is 100x of initial interval. + * is the cap of the increase.
+ * Default is 100x of initial interval. Can't be less than {@link #setInitialInterval(Duration)} + * + * @param maximumInterval the maximum interval value. Default will be used if set to {@code + * null}. */ public Builder setMaximumInterval(Duration maximumInterval) { - Objects.requireNonNull(maximumInterval); - if (maximumInterval.isNegative() || maximumInterval.isZero()) { + if (maximumInterval != null && (maximumInterval.isNegative() || maximumInterval.isZero())) { throw new IllegalArgumentException("Invalid interval: " + maximumInterval); } this.maximumInterval = maximumInterval; @@ -206,23 +211,19 @@ public Builder setRetryOptions(RpcRetryOptions o) { if (o == null) { return this; } - setInitialInterval(merge(initialInterval, o.getInitialInterval(), Duration.class)); - setExpiration(merge(expiration, o.getExpiration(), Duration.class)); - setMaximumInterval(merge(maximumInterval, o.getMaximumInterval(), Duration.class)); - setBackoffCoefficient(merge(backoffCoefficient, o.getBackoffCoefficient(), double.class)); - setMaximumAttempts(merge(maximumAttempts, o.getMaximumAttempts(), int.class)); + setInitialInterval( + OptionsUtils.merge(initialInterval, o.getInitialInterval(), Duration.class)); + setExpiration(OptionsUtils.merge(expiration, o.getExpiration(), Duration.class)); + setMaximumInterval( + OptionsUtils.merge(maximumInterval, o.getMaximumInterval(), Duration.class)); + setBackoffCoefficient( + OptionsUtils.merge(backoffCoefficient, o.getBackoffCoefficient(), double.class)); + setMaximumAttempts(OptionsUtils.merge(maximumAttempts, o.getMaximumAttempts(), int.class)); setDoNotRetry(merge(doNotRetry, o.getDoNotRetry())); validateBuildWithDefaults(); return this; } - private static G merge(G annotation, G options, Class type) { - if (!Defaults.defaultValue(type).equals(options)) { - return options; - } - return annotation; - } - private List merge(List o1, List o2) { if (o2 != null) { return new ArrayList<>(o2); diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubsOptions.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubsOptions.java index ee6355893c..b71f9459d8 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubsOptions.java +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubsOptions.java @@ -19,6 +19,7 @@ package io.temporal.serviceclient; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.uber.m3.tally.NoopScope; import com.uber.m3.tally.Scope; @@ -26,10 +27,7 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Objects; +import java.util.*; public class WorkflowServiceStubsOptions { @@ -126,7 +124,7 @@ public static WorkflowServiceStubsOptions getDefaultInstance() { private final Duration rpcQueryTimeout; /** Retry options for outgoing RPC calls */ - private RpcRetryOptions rpcRetryOptions; + private final RpcRetryOptions rpcRetryOptions; /** Frequency at which connection backoff is going to be reset */ private final Duration connectionBackoffResetFrequency; @@ -150,85 +148,49 @@ public static WorkflowServiceStubsOptions getDefaultInstance() { private final Scope metricsScope; - private WorkflowServiceStubsOptions(Builder builder) { - this.target = builder.target; - this.sslContext = builder.sslContext; - this.enableHttps = builder.enableHttps; - this.channel = builder.channel; - this.rpcLongPollTimeout = builder.rpcLongPollTimeout; - this.rpcQueryTimeout = builder.rpcQueryTimeout; - this.rpcTimeout = builder.rpcTimeout; - this.rpcRetryOptions = builder.rpcRetryOptions; - this.connectionBackoffResetFrequency = builder.connectionBackoffResetFrequency; - this.grpcReconnectFrequency = builder.grpcReconnectFrequency; - this.headers = builder.headers; - this.grpcMetadataProviders = builder.grpcMetadataProviders; - this.grpcClientInterceptors = builder.grpcClientInterceptors; - this.metricsScope = builder.metricsScope; - this.disableHealthCheck = builder.disableHealthCheck; - this.healthCheckAttemptTimeout = builder.healthCheckAttemptTimeout; - this.healthCheckTimeout = builder.healthCheckTimeout; - this.enableKeepAlive = builder.enableKeepAlive; - this.keepAliveTime = builder.keepAliveTime; - this.keepAliveTimeout = builder.keepAliveTimeout; - this.keepAlivePermitWithoutStream = builder.keepAlivePermitWithoutStream; - } - - private WorkflowServiceStubsOptions(Builder builder, boolean ignore) { - if (builder.target != null && builder.channel != null) { - throw new IllegalStateException( - "Only one of the target and channel options can be set at a time"); - } - - if (builder.sslContext != null && builder.channel != null) { - throw new IllegalStateException( - "Only one of the sslContext and channel options can be set at a time"); - } - - if (builder.enableHttps && builder.channel != null) { - throw new IllegalStateException( - "Only one of the enableHttps and channel options can be set at a time"); - } - - this.target = - builder.target == null && builder.channel == null - ? DEFAULT_LOCAL_DOCKER_TARGET - : builder.target; - this.sslContext = builder.sslContext; - this.enableHttps = builder.enableHttps; - this.channel = builder.channel; - this.rpcLongPollTimeout = builder.rpcLongPollTimeout; - this.rpcQueryTimeout = builder.rpcQueryTimeout; - this.rpcTimeout = builder.rpcTimeout; - this.connectionBackoffResetFrequency = builder.connectionBackoffResetFrequency; - this.grpcReconnectFrequency = builder.grpcReconnectFrequency; - if (builder.headers != null) { - this.headers = builder.headers; - } else { - this.headers = new Metadata(); - } - if (builder.grpcMetadataProviders != null) { - this.grpcMetadataProviders = builder.grpcMetadataProviders; - } else { - this.grpcMetadataProviders = Collections.emptyList(); - } - if (builder.grpcClientInterceptors != null) { - this.grpcClientInterceptors = builder.grpcClientInterceptors; - } else { - this.grpcClientInterceptors = Collections.emptyList(); - } - this.metricsScope = builder.metricsScope == null ? new NoopScope() : builder.metricsScope; - this.disableHealthCheck = builder.disableHealthCheck; - this.healthCheckAttemptTimeout = - builder.healthCheckAttemptTimeout == null - ? Duration.ofSeconds(5) - : builder.healthCheckAttemptTimeout; - this.healthCheckTimeout = - builder.healthCheckTimeout == null ? Duration.ofSeconds(10) : builder.healthCheckTimeout; - this.enableKeepAlive = builder.enableKeepAlive; - this.keepAliveTime = builder.keepAliveTime; - this.keepAliveTimeout = builder.keepAliveTimeout; - this.keepAlivePermitWithoutStream = builder.keepAlivePermitWithoutStream; + private WorkflowServiceStubsOptions( + ManagedChannel channel, + String target, + SslContext sslContext, + boolean enableHttps, + boolean disableHealthCheck, + Duration healthCheckAttemptTimeout, + Duration healthCheckTimeout, + boolean enableKeepAlive, + Duration keepAliveTime, + Duration keepAliveTimeout, + boolean keepAlivePermitWithoutStream, + Duration rpcTimeout, + Duration rpcLongPollTimeout, + Duration rpcQueryTimeout, + RpcRetryOptions rpcRetryOptions, + Duration connectionBackoffResetFrequency, + Duration grpcReconnectFrequency, + Metadata headers, + Collection grpcMetadataProviders, + Collection grpcClientInterceptors, + Scope metricsScope) { + this.channel = channel; + this.target = target; + this.sslContext = sslContext; + this.enableHttps = enableHttps; + this.disableHealthCheck = disableHealthCheck; + this.healthCheckAttemptTimeout = healthCheckAttemptTimeout; + this.healthCheckTimeout = healthCheckTimeout; + this.enableKeepAlive = enableKeepAlive; + this.keepAliveTime = keepAliveTime; + this.keepAliveTimeout = keepAliveTimeout; + this.keepAlivePermitWithoutStream = keepAlivePermitWithoutStream; + this.rpcTimeout = rpcTimeout; + this.rpcLongPollTimeout = rpcLongPollTimeout; + this.rpcQueryTimeout = rpcQueryTimeout; + this.rpcRetryOptions = rpcRetryOptions; + this.connectionBackoffResetFrequency = connectionBackoffResetFrequency; + this.grpcReconnectFrequency = grpcReconnectFrequency; + this.headers = headers; + this.grpcMetadataProviders = grpcMetadataProviders; + this.grpcClientInterceptors = grpcClientInterceptors; + this.metricsScope = metricsScope; } public ManagedChannel getChannel() { @@ -378,8 +340,8 @@ public static class Builder { private Duration connectionBackoffResetFrequency = DEFAULT_CONNECTION_BACKOFF_RESET_FREQUENCY; private Duration grpcReconnectFrequency = DEFAULT_GRPC_RECONNECT_FREQUENCY; private Metadata headers; - private Collection grpcMetadataProviders = new ArrayList<>(0); - private Collection grpcClientInterceptors = new ArrayList<>(0); + private Collection grpcMetadataProviders; + private Collection grpcClientInterceptors; private Scope metricsScope; private Builder() {} @@ -396,8 +358,8 @@ private Builder(WorkflowServiceStubsOptions options) { this.connectionBackoffResetFrequency = options.connectionBackoffResetFrequency; this.grpcReconnectFrequency = options.grpcReconnectFrequency; this.headers = options.headers; - this.grpcMetadataProviders = new ArrayList<>(options.grpcMetadataProviders); - this.grpcClientInterceptors = new ArrayList<>(options.grpcClientInterceptors); + this.grpcMetadataProviders = options.grpcMetadataProviders; + this.grpcClientInterceptors = options.grpcClientInterceptors; this.metricsScope = options.metricsScope; this.disableHealthCheck = options.disableHealthCheck; this.healthCheckAttemptTimeout = options.healthCheckAttemptTimeout; @@ -563,6 +525,9 @@ public Builder setHeaders(Metadata headers) { * @return {@code this} */ public Builder addGrpcMetadataProvider(GrpcMetadataProvider grpcMetadataProvider) { + if (this.grpcMetadataProviders == null) { + setGrpcMetadataProviders(new ArrayList<>()); + } this.grpcMetadataProviders.add(grpcMetadataProvider); return this; } @@ -583,6 +548,9 @@ public Builder setGrpcMetadataProviders( * @return {@code this} */ public Builder addGrpcClientInterceptor(ClientInterceptor grpcClientInterceptor) { + if (this.grpcClientInterceptors == null) { + setGrpcClientInterceptors(new ArrayList<>()); + } this.grpcClientInterceptors.add(grpcClientInterceptor); return this; } @@ -671,11 +639,85 @@ public Builder setKeepAlivePermitWithoutStream(boolean keepAlivePermitWithoutStr * @return ClientOptions object with the specified params. */ public WorkflowServiceStubsOptions build() { - return new WorkflowServiceStubsOptions(this); + return new WorkflowServiceStubsOptions( + this.channel, + this.target, + this.sslContext, + this.enableHttps, + this.disableHealthCheck, + this.healthCheckAttemptTimeout, + this.healthCheckTimeout, + this.enableKeepAlive, + this.keepAliveTime, + this.keepAliveTimeout, + this.keepAlivePermitWithoutStream, + this.rpcTimeout, + this.rpcLongPollTimeout, + this.rpcQueryTimeout, + this.rpcRetryOptions, + this.connectionBackoffResetFrequency, + this.grpcReconnectFrequency, + this.headers, + this.grpcMetadataProviders, + this.grpcClientInterceptors, + this.metricsScope); } public WorkflowServiceStubsOptions validateAndBuildWithDefaults() { - return new WorkflowServiceStubsOptions(this, true); + if (this.target != null && this.channel != null) { + throw new IllegalStateException( + "Only one of the target and channel options can be set at a time"); + } + + if (this.sslContext != null && this.channel != null) { + throw new IllegalStateException( + "Only one of the sslContext and channel options can be set at a time"); + } + + if (this.enableHttps && this.channel != null) { + throw new IllegalStateException( + "Only one of the enableHttps and channel options can be set at a time"); + } + + String target = + this.target == null && this.channel == null ? DEFAULT_LOCAL_DOCKER_TARGET : this.target; + + Metadata headers = this.headers != null ? this.headers : new Metadata(); + Collection grpcMetadataProviders = + MoreObjects.firstNonNull(this.grpcMetadataProviders, Collections.emptyList()); + Collection grpcClientInterceptors = + MoreObjects.firstNonNull(this.grpcClientInterceptors, Collections.emptyList()); + + Scope metricsScope = this.metricsScope != null ? this.metricsScope : new NoopScope(); + Duration healthCheckAttemptTimeout = + this.healthCheckAttemptTimeout != null + ? this.healthCheckAttemptTimeout + : Duration.ofSeconds(5); + Duration healthCheckTimeout = + this.healthCheckTimeout != null ? this.healthCheckTimeout : Duration.ofSeconds(10); + + return new WorkflowServiceStubsOptions( + this.channel, + target, + this.sslContext, + this.enableHttps, + this.disableHealthCheck, + healthCheckAttemptTimeout, + healthCheckTimeout, + this.enableKeepAlive, + this.keepAliveTime, + this.keepAliveTimeout, + this.keepAlivePermitWithoutStream, + this.rpcTimeout, + this.rpcLongPollTimeout, + this.rpcQueryTimeout, + this.rpcRetryOptions, + this.connectionBackoffResetFrequency, + this.grpcReconnectFrequency, + headers, + grpcMetadataProviders, + grpcClientInterceptors, + metricsScope); } } }