Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import io.temporal.client.ActivityCanceledException;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.EncodedValues;
import io.temporal.internal.common.CheckedExceptionWrapper;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import io.temporal.failure.CanceledFailure;
import io.temporal.failure.TerminatedFailure;
import io.temporal.failure.TimeoutFailure;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.serviceclient.GrpcRetryer;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import io.temporal.api.workflowservice.v1.StartWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.StartWorkflowExecutionResponse;
import io.temporal.api.workflowservice.v1.TerminateWorkflowExecutionRequest;
import io.temporal.internal.common.GrpcRetryer;
import io.temporal.internal.common.SignalWithStartWorkflowExecutionParameters;
import io.temporal.serviceclient.GrpcRetryer;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import io.temporal.failure.CanceledFailure;
import io.temporal.failure.FailureConverter;
import io.temporal.failure.TemporalFailure;
import io.temporal.internal.common.GrpcRetryer;
import io.temporal.internal.common.OptionsUtils;
import io.temporal.serviceclient.GrpcRetryer;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package io.temporal.internal.replay;

import static io.temporal.internal.common.CheckedExceptionWrapper.wrap;
import static io.temporal.internal.common.ProtobufTimeUtils.toJavaDuration;
import static io.temporal.serviceclient.CheckedExceptionWrapper.wrap;

import com.google.common.base.Throwables;
import com.google.protobuf.util.Durations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
import io.temporal.internal.common.GrpcRetryer;
import io.temporal.serviceclient.GrpcRetryer;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

package io.temporal.internal.statemachines;

import static io.temporal.internal.common.CheckedExceptionWrapper.unwrap;
import static io.temporal.internal.common.WorkflowExecutionUtils.getEventTypeForCommand;
import static io.temporal.internal.common.WorkflowExecutionUtils.isCommandEvent;
import static io.temporal.internal.statemachines.LocalActivityStateMachine.LOCAL_ACTIVITY_MARKER_NAME;
import static io.temporal.internal.statemachines.LocalActivityStateMachine.MARKER_ACTIVITY_ID_KEY;
import static io.temporal.internal.statemachines.VersionStateMachine.MARKER_CHANGE_ID_KEY;
import static io.temporal.internal.statemachines.VersionStateMachine.VERSION_MARKER_NAME;
import static io.temporal.serviceclient.CheckedExceptionWrapper.unwrap;

import com.cronutils.utils.VisibleForTesting;
import com.google.common.base.Strings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptorBase;
import io.temporal.failure.CanceledFailure;
import io.temporal.internal.common.CheckedExceptionWrapper;
import io.temporal.internal.context.ContextThreadLocal;
import io.temporal.internal.replay.ExecuteActivityParameters;
import io.temporal.internal.replay.ExecuteLocalActivityParameters;
import io.temporal.internal.replay.ReplayWorkflowContext;
import io.temporal.internal.replay.StartChildWorkflowExecutionParameters;
import io.temporal.internal.replay.WorkflowExecutorCache;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.workflow.Functions;
import io.temporal.workflow.Functions.Func;
import io.temporal.workflow.Functions.Func1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@
import io.temporal.failure.SimulatedTimeoutFailure;
import io.temporal.failure.TemporalFailure;
import io.temporal.failure.TimeoutFailure;
import io.temporal.internal.common.CheckedExceptionWrapper;
import io.temporal.internal.metrics.MetricsType;
import io.temporal.internal.replay.FailureWrapperException;
import io.temporal.internal.worker.ActivityTask;
import io.temporal.internal.worker.ActivityTaskHandler;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.lang.reflect.InvocationTargetException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package io.temporal.internal.sync;

import static io.temporal.internal.common.CheckedExceptionWrapper.wrap;
import static io.temporal.internal.sync.WorkflowInternal.unwrap;
import static io.temporal.serviceclient.CheckedExceptionWrapper.wrap;

import com.google.common.base.Preconditions;
import io.temporal.api.common.v1.Payloads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata;
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
import io.temporal.failure.FailureConverter;
import io.temporal.internal.common.CheckedExceptionWrapper;
import io.temporal.internal.logging.ReplayAwareLogger;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.workflow.ActivityStub;
import io.temporal.workflow.CancellationScope;
import io.temporal.workflow.ChildWorkflowOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.failure.CanceledFailure;
import io.temporal.failure.FailureConverter;
import io.temporal.internal.common.CheckedExceptionWrapper;
import io.temporal.internal.common.StatusUtils;
import io.temporal.internal.common.WorkflowExecutionFailedException;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.serviceclient.StatusUtils;
import java.lang.reflect.Type;
import java.util.Optional;
import java.util.UUID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedRequest;
import io.temporal.internal.common.GrpcRetryer;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.logging.LoggerTag;
import io.temporal.internal.metrics.MetricsType;
import io.temporal.internal.replay.FailureWrapperException;
import io.temporal.internal.worker.ActivityTaskHandler.Result;
import io.temporal.internal.worker.activity.ActivityWorkerHelper;
import io.temporal.serviceclient.GrpcRetryer;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import com.uber.m3.tally.Scope;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.internal.common.BackoffThrottler;
import io.temporal.internal.common.InternalUtils;
import io.temporal.internal.metrics.MetricsType;
import io.temporal.serviceclient.BackoffThrottler;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskCompletedRequest;
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskCompletedResponse;
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskFailedRequest;
import io.temporal.internal.common.GrpcRetryer;
import io.temporal.internal.common.WorkflowExecutionHistory;
import io.temporal.internal.common.WorkflowExecutionUtils;
import io.temporal.internal.logging.LoggerTag;
import io.temporal.internal.metrics.MetricsType;
import io.temporal.serviceclient.GrpcRetryer;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.serviceclient.GrpcRetryer;
import io.temporal.serviceclient.RpcRetryOptions;
import java.time.Duration;
import java.util.concurrent.CancellationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package io.temporal.internal.sync;

import io.temporal.internal.common.CheckedExceptionWrapper;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.workflow.Workflow;
import org.junit.Assert;
import org.junit.Test;
Expand Down
1 change: 1 addition & 0 deletions temporal-serviceclient/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies {
api 'io.grpc:grpc-stub:1.38.0'
api 'io.grpc:grpc-core:1.38.0'
api 'io.grpc:grpc-netty-shaded:1.38.0'
api 'io.grpc:grpc-services:1.38.0'
api group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.17.1'
api group: 'com.uber.m3', name: 'tally-core', version: '0.6.1'
api group: 'org.slf4j', name: 'slf4j-api', version: '1.7.30'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* permissions and limitations under the License.
*/

package io.temporal.internal.common;
package io.temporal.serviceclient;

import java.time.Duration;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* permissions and limitations under the License.
*/

package io.temporal.internal.common;
package io.temporal.serviceclient;

import java.time.Duration;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* permissions and limitations under the License.
*/

package io.temporal.internal.common;
package io.temporal.serviceclient;

import java.lang.reflect.InvocationTargetException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@
* permissions and limitations under the License.
*/

package io.temporal.internal.common;

import static io.temporal.internal.common.CheckedExceptionWrapper.unwrap;
package io.temporal.serviceclient;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.serviceclient.RpcRetryOptions;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -145,7 +142,7 @@ public static <R> CompletableFuture<R> retryWithResultAsync(
if (e == null) {
unwrappedExceptionResult.complete(r);
} else {
unwrappedExceptionResult.completeExceptionally(unwrap(e));
unwrappedExceptionResult.completeExceptionally(CheckedExceptionWrapper.unwrap(e));
}
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* permissions and limitations under the License.
*/

package io.temporal.internal.common;
package io.temporal.serviceclient;

import com.google.common.base.Preconditions;
import com.google.protobuf.Any;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@

package io.temporal.serviceclient;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.StatusRuntimeException;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
Expand All @@ -39,7 +45,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** TODO: (vkoby) Add metrics. */
public final class WorkflowServiceStubsImpl implements WorkflowServiceStubs {

private static final Logger log = LoggerFactory.getLogger(WorkflowServiceStubsImpl.class);
Expand All @@ -60,13 +65,17 @@ public final class WorkflowServiceStubsImpl implements WorkflowServiceStubs {

private static final String CLIENT_NAME_HEADER_VALUE = "temporal-java";

private static final String HEALTH_CHECK_SERVICE_NAME =
"temporal.api.workflowservice.v1.WorkflowService";

private final WorkflowServiceStubsOptions options;
private final ManagedChannel channel;
// Shutdown channel that was created by us
private final boolean channelNeedsShutdown;
private final AtomicBoolean shutdownRequested = new AtomicBoolean();
private final WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub;
private final WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub;
private final HealthGrpc.HealthBlockingStub healthBlockingStub;
private final Server inProcessServer;
private final ScheduledExecutorService grpcConnectionManager;

Expand Down Expand Up @@ -149,6 +158,10 @@ public WorkflowServiceStubsImpl(
}
channelNeedsShutdown = true;
}

healthBlockingStub = HealthGrpc.newBlockingStub(channel);
checkHealth();

GrpcMetricsInterceptor metricsInterceptor =
new GrpcMetricsInterceptor(options.getMetricsScope());
ClientInterceptor deadlineInterceptor = new GrpcDeadlineInterceptor(options);
Expand Down Expand Up @@ -212,6 +225,44 @@ private ScheduledExecutorService grpcConnectionManager() {
.build());
}

/**
* Checks service health using gRPC health check:
* https://github.com/grpc/grpc/blob/master/doc/health-checking.md
*
* @throws StatusRuntimeException if the service is unavailable.
* @throws RuntimeException if the check returns unhealthy status.
* @return true if server is up.
*/
private void checkHealth() {
checkHealth(HEALTH_CHECK_SERVICE_NAME);
}

@VisibleForTesting
void checkHealth(String serviceName) {
if (options.getDisableHealthCheck()) {
RpcRetryOptions retryOptions =
RpcRetryOptions.newBuilder()
.setExpiration(getOptions().getHealthCheckTimeout())
.validateBuildWithDefaults();

HealthCheckResponse response =
GrpcRetryer.retryWithResult(
retryOptions,
() -> {
return healthBlockingStub
.withDeadline(
Deadline.after(
options.getHealthCheckAttemptTimeout().getSeconds(), TimeUnit.SECONDS))
.check(HealthCheckRequest.newBuilder().setService(serviceName).build());
});

if (!HealthCheckResponse.ServingStatus.SERVING.equals(response.getStatus())) {
throw new RuntimeException(
"Health check returned unhealthy status: " + response.getStatus());
}
}
}

/** @return Blocking (synchronous) stub that allows direct calls to service. */
public WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub() {
return blockingStub;
Expand Down
Loading