Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public Builder setBackoffCoefficient(double backoffCoefficient) {
}

/**
* When exceeded the amount of attempt, stop. Even if expiration time is not reached. <br>
* When exceeded the amount of attempts, stop. Even if expiration time is not reached. <br>
* Default is unlimited.
*
* @param maximumAttempts Maximum number of attempts. Default will be used if set to {@code 0}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

package io.temporal.serviceclient;

import com.google.common.base.Defaults;
import com.google.protobuf.GeneratedMessageV3;
import io.grpc.Status;
import io.temporal.internal.common.OptionsUtils;
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -147,11 +147,13 @@ public Builder setBackoffCoefficient(double backoffCoefficient) {
}

/**
* Maximum number of attempts. When exceeded the retries stop even if not expired yet. Must be 1
* or bigger. Default is unlimited.
* When exceeded the amount of attempts, stop. Even if expiration time is not reached. <br>
* Default is unlimited.
*
* @param maximumAttempts Maximum number of attempts. Default will be used if set to {@code 0}.
*/
public Builder setMaximumAttempts(int maximumAttempts) {
if (maximumAttempts < 1) {
if (maximumAttempts < 0) {
throw new IllegalArgumentException("Invalid maximumAttempts: " + maximumAttempts);
}
this.maximumAttempts = maximumAttempts;
Expand All @@ -160,11 +162,14 @@ public Builder setMaximumAttempts(int maximumAttempts) {

/**
* Maximum interval between retries. Exponential backoff leads to interval increase. This value
* is the cap of the increase. Default is 100x of initial interval.
* is the cap of the increase. <br>
* Default is 100x of initial interval. Can't be less than {@link #setInitialInterval(Duration)}
*
* @param maximumInterval the maximum interval value. Default will be used if set to {@code
* null}.
*/
public Builder setMaximumInterval(Duration maximumInterval) {
Objects.requireNonNull(maximumInterval);
if (maximumInterval.isNegative() || maximumInterval.isZero()) {
if (maximumInterval != null && (maximumInterval.isNegative() || maximumInterval.isZero())) {
throw new IllegalArgumentException("Invalid interval: " + maximumInterval);
}
this.maximumInterval = maximumInterval;
Expand Down Expand Up @@ -206,23 +211,19 @@ public Builder setRetryOptions(RpcRetryOptions o) {
if (o == null) {
return this;
}
setInitialInterval(merge(initialInterval, o.getInitialInterval(), Duration.class));
setExpiration(merge(expiration, o.getExpiration(), Duration.class));
setMaximumInterval(merge(maximumInterval, o.getMaximumInterval(), Duration.class));
setBackoffCoefficient(merge(backoffCoefficient, o.getBackoffCoefficient(), double.class));
setMaximumAttempts(merge(maximumAttempts, o.getMaximumAttempts(), int.class));
setInitialInterval(
OptionsUtils.merge(initialInterval, o.getInitialInterval(), Duration.class));
setExpiration(OptionsUtils.merge(expiration, o.getExpiration(), Duration.class));
setMaximumInterval(
OptionsUtils.merge(maximumInterval, o.getMaximumInterval(), Duration.class));
setBackoffCoefficient(
OptionsUtils.merge(backoffCoefficient, o.getBackoffCoefficient(), double.class));
setMaximumAttempts(OptionsUtils.merge(maximumAttempts, o.getMaximumAttempts(), int.class));
setDoNotRetry(merge(doNotRetry, o.getDoNotRetry()));
validateBuildWithDefaults();
return this;
}

private static <G> G merge(G annotation, G options, Class<G> type) {
if (!Defaults.defaultValue(type).equals(options)) {
return options;
}
return annotation;
}

private List<DoNotRetryItem> merge(List<DoNotRetryItem> o1, List<DoNotRetryItem> o2) {
if (o2 != null) {
return new ArrayList<>(o2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@

package io.temporal.serviceclient;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.uber.m3.tally.NoopScope;
import com.uber.m3.tally.Scope;
import io.grpc.*;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.*;

public class WorkflowServiceStubsOptions {

Expand Down Expand Up @@ -126,7 +124,7 @@ public static WorkflowServiceStubsOptions getDefaultInstance() {
private final Duration rpcQueryTimeout;

/** Retry options for outgoing RPC calls */
private RpcRetryOptions rpcRetryOptions;
private final RpcRetryOptions rpcRetryOptions;

/** Frequency at which connection backoff is going to be reset */
private final Duration connectionBackoffResetFrequency;
Expand All @@ -150,85 +148,49 @@ public static WorkflowServiceStubsOptions getDefaultInstance() {

private final Scope metricsScope;

private WorkflowServiceStubsOptions(Builder builder) {
this.target = builder.target;
this.sslContext = builder.sslContext;
this.enableHttps = builder.enableHttps;
this.channel = builder.channel;
this.rpcLongPollTimeout = builder.rpcLongPollTimeout;
this.rpcQueryTimeout = builder.rpcQueryTimeout;
this.rpcTimeout = builder.rpcTimeout;
this.rpcRetryOptions = builder.rpcRetryOptions;
this.connectionBackoffResetFrequency = builder.connectionBackoffResetFrequency;
this.grpcReconnectFrequency = builder.grpcReconnectFrequency;
this.headers = builder.headers;
this.grpcMetadataProviders = builder.grpcMetadataProviders;
this.grpcClientInterceptors = builder.grpcClientInterceptors;
this.metricsScope = builder.metricsScope;
this.disableHealthCheck = builder.disableHealthCheck;
this.healthCheckAttemptTimeout = builder.healthCheckAttemptTimeout;
this.healthCheckTimeout = builder.healthCheckTimeout;
this.enableKeepAlive = builder.enableKeepAlive;
this.keepAliveTime = builder.keepAliveTime;
this.keepAliveTimeout = builder.keepAliveTimeout;
this.keepAlivePermitWithoutStream = builder.keepAlivePermitWithoutStream;
}

private WorkflowServiceStubsOptions(Builder builder, boolean ignore) {
if (builder.target != null && builder.channel != null) {
throw new IllegalStateException(
"Only one of the target and channel options can be set at a time");
}

if (builder.sslContext != null && builder.channel != null) {
throw new IllegalStateException(
"Only one of the sslContext and channel options can be set at a time");
}

if (builder.enableHttps && builder.channel != null) {
throw new IllegalStateException(
"Only one of the enableHttps and channel options can be set at a time");
}

this.target =
builder.target == null && builder.channel == null
? DEFAULT_LOCAL_DOCKER_TARGET
: builder.target;
this.sslContext = builder.sslContext;
this.enableHttps = builder.enableHttps;
this.channel = builder.channel;
this.rpcLongPollTimeout = builder.rpcLongPollTimeout;
this.rpcQueryTimeout = builder.rpcQueryTimeout;
this.rpcTimeout = builder.rpcTimeout;
this.connectionBackoffResetFrequency = builder.connectionBackoffResetFrequency;
this.grpcReconnectFrequency = builder.grpcReconnectFrequency;
if (builder.headers != null) {
this.headers = builder.headers;
} else {
this.headers = new Metadata();
}
if (builder.grpcMetadataProviders != null) {
this.grpcMetadataProviders = builder.grpcMetadataProviders;
} else {
this.grpcMetadataProviders = Collections.emptyList();
}
if (builder.grpcClientInterceptors != null) {
this.grpcClientInterceptors = builder.grpcClientInterceptors;
} else {
this.grpcClientInterceptors = Collections.emptyList();
}
this.metricsScope = builder.metricsScope == null ? new NoopScope() : builder.metricsScope;
this.disableHealthCheck = builder.disableHealthCheck;
this.healthCheckAttemptTimeout =
builder.healthCheckAttemptTimeout == null
? Duration.ofSeconds(5)
: builder.healthCheckAttemptTimeout;
this.healthCheckTimeout =
builder.healthCheckTimeout == null ? Duration.ofSeconds(10) : builder.healthCheckTimeout;
this.enableKeepAlive = builder.enableKeepAlive;
this.keepAliveTime = builder.keepAliveTime;
this.keepAliveTimeout = builder.keepAliveTimeout;
this.keepAlivePermitWithoutStream = builder.keepAlivePermitWithoutStream;
private WorkflowServiceStubsOptions(
ManagedChannel channel,
String target,
SslContext sslContext,
boolean enableHttps,
boolean disableHealthCheck,
Duration healthCheckAttemptTimeout,
Duration healthCheckTimeout,
boolean enableKeepAlive,
Duration keepAliveTime,
Duration keepAliveTimeout,
boolean keepAlivePermitWithoutStream,
Duration rpcTimeout,
Duration rpcLongPollTimeout,
Duration rpcQueryTimeout,
RpcRetryOptions rpcRetryOptions,
Duration connectionBackoffResetFrequency,
Duration grpcReconnectFrequency,
Metadata headers,
Collection<GrpcMetadataProvider> grpcMetadataProviders,
Collection<ClientInterceptor> grpcClientInterceptors,
Scope metricsScope) {
this.channel = channel;
this.target = target;
this.sslContext = sslContext;
this.enableHttps = enableHttps;
this.disableHealthCheck = disableHealthCheck;
this.healthCheckAttemptTimeout = healthCheckAttemptTimeout;
this.healthCheckTimeout = healthCheckTimeout;
this.enableKeepAlive = enableKeepAlive;
this.keepAliveTime = keepAliveTime;
this.keepAliveTimeout = keepAliveTimeout;
this.keepAlivePermitWithoutStream = keepAlivePermitWithoutStream;
this.rpcTimeout = rpcTimeout;
this.rpcLongPollTimeout = rpcLongPollTimeout;
this.rpcQueryTimeout = rpcQueryTimeout;
this.rpcRetryOptions = rpcRetryOptions;
this.connectionBackoffResetFrequency = connectionBackoffResetFrequency;
this.grpcReconnectFrequency = grpcReconnectFrequency;
this.headers = headers;
this.grpcMetadataProviders = grpcMetadataProviders;
this.grpcClientInterceptors = grpcClientInterceptors;
this.metricsScope = metricsScope;
}

public ManagedChannel getChannel() {
Expand Down Expand Up @@ -378,8 +340,8 @@ public static class Builder {
private Duration connectionBackoffResetFrequency = DEFAULT_CONNECTION_BACKOFF_RESET_FREQUENCY;
private Duration grpcReconnectFrequency = DEFAULT_GRPC_RECONNECT_FREQUENCY;
private Metadata headers;
private Collection<GrpcMetadataProvider> grpcMetadataProviders = new ArrayList<>(0);
private Collection<ClientInterceptor> grpcClientInterceptors = new ArrayList<>(0);
private Collection<GrpcMetadataProvider> grpcMetadataProviders;
private Collection<ClientInterceptor> grpcClientInterceptors;
private Scope metricsScope;

private Builder() {}
Expand All @@ -396,8 +358,8 @@ private Builder(WorkflowServiceStubsOptions options) {
this.connectionBackoffResetFrequency = options.connectionBackoffResetFrequency;
this.grpcReconnectFrequency = options.grpcReconnectFrequency;
this.headers = options.headers;
this.grpcMetadataProviders = new ArrayList<>(options.grpcMetadataProviders);
this.grpcClientInterceptors = new ArrayList<>(options.grpcClientInterceptors);
this.grpcMetadataProviders = options.grpcMetadataProviders;
this.grpcClientInterceptors = options.grpcClientInterceptors;
this.metricsScope = options.metricsScope;
this.disableHealthCheck = options.disableHealthCheck;
this.healthCheckAttemptTimeout = options.healthCheckAttemptTimeout;
Expand Down Expand Up @@ -563,6 +525,9 @@ public Builder setHeaders(Metadata headers) {
* @return {@code this}
*/
public Builder addGrpcMetadataProvider(GrpcMetadataProvider grpcMetadataProvider) {
if (this.grpcMetadataProviders == null) {
setGrpcMetadataProviders(new ArrayList<>());
}
this.grpcMetadataProviders.add(grpcMetadataProvider);
return this;
}
Expand All @@ -583,6 +548,9 @@ public Builder setGrpcMetadataProviders(
* @return {@code this}
*/
public Builder addGrpcClientInterceptor(ClientInterceptor grpcClientInterceptor) {
if (this.grpcClientInterceptors == null) {
setGrpcClientInterceptors(new ArrayList<>());
}
this.grpcClientInterceptors.add(grpcClientInterceptor);
return this;
}
Expand Down Expand Up @@ -671,11 +639,85 @@ public Builder setKeepAlivePermitWithoutStream(boolean keepAlivePermitWithoutStr
* @return ClientOptions object with the specified params.
*/
public WorkflowServiceStubsOptions build() {
return new WorkflowServiceStubsOptions(this);
return new WorkflowServiceStubsOptions(
this.channel,
this.target,
this.sslContext,
this.enableHttps,
this.disableHealthCheck,
this.healthCheckAttemptTimeout,
this.healthCheckTimeout,
this.enableKeepAlive,
this.keepAliveTime,
this.keepAliveTimeout,
this.keepAlivePermitWithoutStream,
this.rpcTimeout,
this.rpcLongPollTimeout,
this.rpcQueryTimeout,
this.rpcRetryOptions,
this.connectionBackoffResetFrequency,
this.grpcReconnectFrequency,
this.headers,
this.grpcMetadataProviders,
this.grpcClientInterceptors,
this.metricsScope);
}

public WorkflowServiceStubsOptions validateAndBuildWithDefaults() {
return new WorkflowServiceStubsOptions(this, true);
if (this.target != null && this.channel != null) {
throw new IllegalStateException(
"Only one of the target and channel options can be set at a time");
}

if (this.sslContext != null && this.channel != null) {
throw new IllegalStateException(
"Only one of the sslContext and channel options can be set at a time");
}

if (this.enableHttps && this.channel != null) {
throw new IllegalStateException(
"Only one of the enableHttps and channel options can be set at a time");
}

String target =
this.target == null && this.channel == null ? DEFAULT_LOCAL_DOCKER_TARGET : this.target;

Metadata headers = this.headers != null ? this.headers : new Metadata();
Collection<GrpcMetadataProvider> grpcMetadataProviders =
MoreObjects.firstNonNull(this.grpcMetadataProviders, Collections.emptyList());
Collection<ClientInterceptor> grpcClientInterceptors =
MoreObjects.firstNonNull(this.grpcClientInterceptors, Collections.emptyList());

Scope metricsScope = this.metricsScope != null ? this.metricsScope : new NoopScope();
Duration healthCheckAttemptTimeout =
this.healthCheckAttemptTimeout != null
? this.healthCheckAttemptTimeout
: Duration.ofSeconds(5);
Duration healthCheckTimeout =
this.healthCheckTimeout != null ? this.healthCheckTimeout : Duration.ofSeconds(10);

return new WorkflowServiceStubsOptions(
this.channel,
target,
this.sslContext,
this.enableHttps,
this.disableHealthCheck,
healthCheckAttemptTimeout,
healthCheckTimeout,
this.enableKeepAlive,
this.keepAliveTime,
this.keepAliveTimeout,
this.keepAlivePermitWithoutStream,
this.rpcTimeout,
this.rpcLongPollTimeout,
this.rpcQueryTimeout,
this.rpcRetryOptions,
this.connectionBackoffResetFrequency,
this.grpcReconnectFrequency,
headers,
grpcMetadataProviders,
grpcClientInterceptors,
metricsScope);
}
}
}