diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusClient.java b/temporal-sdk/src/main/java/io/temporal/client/NexusClient.java new file mode 100644 index 000000000..19a0e809d --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusClient.java @@ -0,0 +1,79 @@ +package io.temporal.client; + +import io.temporal.common.Experimental; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.StartNexusOperationExecutionInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.StartNexusOperationExecutionOutput; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.lang.reflect.Type; +import javax.annotation.Nullable; + +/** + * Client for managing standalone Nexus operation executions. + * + *

Per-operation actions (describe, cancel, terminate, delete, get result) live on {@link + * NexusClientHandle}; obtain a handle via {@link #getHandle}. + */ +@Experimental +public interface NexusClient { + static NexusClient newInstance(WorkflowServiceStubs service) { + return NexusClientImpl.newInstance(service, NexusClientOperationOptions.getDefaultInstance()); + } + + static NexusClient newInstance( + WorkflowServiceStubs service, NexusClientOperationOptions options) { + return NexusClientImpl.newInstance(service, options); + } + + /** Returns the underlying gRPC stubs this client routes RPCs through. */ + WorkflowServiceStubs getWorkflowServiceStubs(); + + /** + * Obtain an untyped handle to an existing operation; targets the latest run. To bind a result + * type, wrap the returned handle with {@link NexusClientHandle#fromUntyped}. + */ + UntypedNexusClientHandle getHandle(String operationId); + + /** + * Obtain an untyped handle to an existing operation, optionally pinned to a specific run. To bind + * a result type, wrap the returned handle with {@link NexusClientHandle#fromUntyped}. + */ + UntypedNexusClientHandle getHandle(String operationId, @Nullable String runId); + + /** Obtain a typed handle to an existing operation, bound to {@code resultClass}. */ + NexusClientHandle getHandle( + String operationId, @Nullable String runId, Class resultClass); + + /** + * Obtain a typed handle to an existing operation, bound to {@code resultClass}/{@code + * resultType}. Use the {@code resultType} variant when the result is a generic type whose + * parameters cannot be captured by {@link Class} alone (e.g. {@code List}). + */ + NexusClientHandle getHandle( + String operationId, @Nullable String runId, Class resultClass, @Nullable Type resultType); + + /** Build an untyped service client targeting {@code endpoint}/{@code serviceName}. */ + UntypedNexusServiceClient newUntypedNexusServiceClient(String endpoint, String serviceName); + + /** + * Build a typed service client for {@code serviceInterface}, targeting {@code endpoint}. The + * service name is extracted from {@code serviceInterface} via {@link + * io.nexusrpc.ServiceDefinition#fromClass}. + */ + NexusServiceClient newNexusServiceClient(Class serviceInterface, String endpoint); + + /** Start a new standalone Nexus operation execution. */ + StartNexusOperationExecutionOutput startNexusOperationExecution( + StartNexusOperationExecutionInput input); + + /** List standalone Nexus operation executions matching a query. */ + ListNexusOperationExecutionsOutput listNexusOperationExecutions( + ListNexusOperationExecutionsInput input); + + /** Count standalone Nexus operation executions matching a query. */ + CountNexusOperationExecutionsOutput countNexusOperationExecutions( + CountNexusOperationExecutionsInput input); +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusClientHandle.java b/temporal-sdk/src/main/java/io/temporal/client/NexusClientHandle.java new file mode 100644 index 000000000..5500eadd8 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusClientHandle.java @@ -0,0 +1,45 @@ +package io.temporal.client; + +import java.lang.reflect.Type; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nullable; + +/** + * Typed handle for interacting with an existing standalone Nexus operation execution. Add a result + * type binding to an {@link UntypedNexusClientHandle} (returned by {@link + * NexusClient#getHandle(String)}) by calling one of the {@link #fromUntyped} factories. + */ +public interface NexusClientHandle extends UntypedNexusClientHandle { + + /** Wrap an {@link UntypedNexusClientHandle} as a typed handle bound to {@code resultClass}. */ + static NexusClientHandle fromUntyped( + UntypedNexusClientHandle handle, Class resultClass) { + return fromUntyped(handle, resultClass, null); + } + + /** + * Wrap an {@link UntypedNexusClientHandle} as a typed handle bound to {@code resultClass} and + * {@code resultType}. Pass a non-null {@code resultType} when the result is a generic type whose + * parameters cannot be captured by {@link Class} alone (e.g. {@code List}). + */ + static NexusClientHandle fromUntyped( + UntypedNexusClientHandle handle, Class resultClass, @Nullable Type resultType) { + return NexusClientHandleImpl.fromUntyped(handle, resultClass, resultType); + } + + /** Block until the operation completes and return the typed result. */ + R getResult(); + + /** Block up to {@code timeout} for the operation to complete and return the typed result. */ + R getResult(long timeout, java.util.concurrent.TimeUnit unit) + throws java.util.concurrent.TimeoutException; + + /** Returns a future that completes with the typed result when the operation finishes. */ + CompletableFuture getResultAsync(); + + /** + * Returns a future that completes with the typed result, or completes exceptionally with a {@link + * java.util.concurrent.TimeoutException} if {@code timeout} elapses first. + */ + CompletableFuture getResultAsync(long timeout, java.util.concurrent.TimeUnit unit); +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusClientHandleImpl.java b/temporal-sdk/src/main/java/io/temporal/client/NexusClientHandleImpl.java new file mode 100644 index 000000000..8283ed575 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusClientHandleImpl.java @@ -0,0 +1,350 @@ +package io.temporal.client; + +import io.grpc.Deadline; +import io.temporal.api.common.v1.Payload; +import io.temporal.api.enums.v1.NexusOperationWaitStage; +import io.temporal.api.failure.v1.Failure; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.interceptors.NexusClientCallsInterceptor; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.DeleteNexusOperationExecutionInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.DescribeNexusOperationExecutionInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.DescribeNexusOperationExecutionOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.PollNexusOperationExecutionInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.PollNexusOperationExecutionOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.RequestCancelNexusOperationExecutionInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.TerminateNexusOperationExecutionInput; +import java.lang.reflect.Type; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; + +/** + * Single implementation of {@link NexusClientHandle}/{@link UntypedNexusClientHandle}. Constructed + * untyped by {@link NexusClient#getHandle(String)} and bound to a result type via {@link + * NexusClientHandle#fromUntyped}. + */ +public class NexusClientHandleImpl implements NexusClientHandle { + + /** Default deadline applied to per-handle non-poll RPCs (e.g. {@code describe}). */ + private static final long DEFAULT_DEADLINE_SECONDS = 30; + + /** + * Per-poll deadline used by {@link #getResult} and {@link #getResultAsync}. The server holds the + * request up to this long waiting for completion; if the operation hasn't finished, we re-poll. + */ + private static final long POLL_DEADLINE_SECONDS = 60; + + final NexusClientCallsInterceptor interceptor; + final String operationId; + final @Nullable String runId; + final DataConverter dataConverter; + final @Nullable Class resultClass; + final @Nullable Type resultType; + + /** Construct an untyped handle. Used by {@link NexusClientImpl#getHandle}. */ + public NexusClientHandleImpl( + NexusClientCallsInterceptor interceptor, + String operationId, + @Nullable String runId, + DataConverter dataConverter) { + this(interceptor, operationId, runId, dataConverter, null, null); + } + + /** + * Implementation of {@link NexusClientHandle#fromUntyped(UntypedNexusClientHandle, Class, Type)}. + * Lives here so the interface doesn't reach into impl-private state. + */ + static NexusClientHandle fromUntyped( + UntypedNexusClientHandle handle, Class resultClass, @Nullable Type resultType) { + if (!(handle instanceof NexusClientHandleImpl)) { + throw new IllegalArgumentException( + "Unsupported handle implementation: " + handle.getClass().getName()); + } + NexusClientHandleImpl source = (NexusClientHandleImpl) handle; + return new NexusClientHandleImpl<>( + source.interceptor, + source.operationId, + source.runId, + source.dataConverter, + resultClass, + resultType); + } + + /** Construct a typed handle. Use {@link NexusClientHandle#fromUntyped} from caller code. */ + NexusClientHandleImpl( + NexusClientCallsInterceptor interceptor, + String operationId, + @Nullable String runId, + DataConverter dataConverter, + @Nullable Class resultClass, + @Nullable Type resultType) { + if (interceptor == null) { + throw new IllegalArgumentException("interceptor is required"); + } + if (operationId == null) { + throw new IllegalArgumentException("operationId is required"); + } + if (dataConverter == null) { + throw new IllegalArgumentException("dataConverter is required"); + } + this.interceptor = interceptor; + this.operationId = operationId; + this.runId = runId; + this.dataConverter = dataConverter; + this.resultClass = resultClass; + this.resultType = resultType; + } + + @Override + public String getNexusOperationId() { + return operationId; + } + + @Override + public @Nullable String getNexusOperationRunId() { + return runId; + } + + @Override + public NexusClientOperationExecutionDescription describe() { + DescribeNexusOperationExecutionInput input = + new DescribeNexusOperationExecutionInput( + operationId, + runId, + /* includeInput= */ false, + /* includeOutcome= */ true, + Deadline.after(DEFAULT_DEADLINE_SECONDS, TimeUnit.SECONDS)); + DescribeNexusOperationExecutionOutput output = + interceptor.describeNexusOperationExecution(input); + return output.getDescription(); + } + + @Override + public void cancel() { + cancel(null); + } + + @Override + public void cancel(@Nullable String reason) { + interceptor.requestCancelNexusOperationExecution( + new RequestCancelNexusOperationExecutionInput(operationId, runId, reason)); + } + + @Override + public void terminate() { + terminate(null); + } + + @Override + public void terminate(@Nullable String reason) { + interceptor.terminateNexusOperationExecution( + new TerminateNexusOperationExecutionInput(operationId, runId, reason)); + } + + @Override + public void delete() { + interceptor.deleteNexusOperationExecution( + new DeleteNexusOperationExecutionInput(operationId, runId)); + } + + @Override + public X getResult(Class resultClass) { + return getResult(resultClass, null); + } + + @Override + public X getResult(Class resultClass, @Nullable Type resultType) { + PollNexusOperationExecutionOutput out = pollUntilCompleted(); + return extractResult(out, resultClass, resultType); + } + + @Override + public CompletableFuture getResultAsync(Class resultClass) { + return getResultAsync(resultClass, null); + } + + @Override + public CompletableFuture getResultAsync(Class resultClass, @Nullable Type resultType) { + return pollAsyncUntilCompleted().thenApply(out -> extractResult(out, resultClass, resultType)); + } + + @Override + public X getResult(long timeout, TimeUnit unit, Class resultClass) + throws TimeoutException { + return getResult(timeout, unit, resultClass, null); + } + + @Override + public X getResult( + long timeout, TimeUnit unit, Class resultClass, @Nullable Type resultType) + throws TimeoutException { + long deadlineNanos = System.nanoTime() + unit.toNanos(timeout); + PollNexusOperationExecutionOutput out = pollSyncUntilCompletedOrDeadline(deadlineNanos); + return extractResult(out, resultClass, resultType); + } + + @Override + public CompletableFuture getResultAsync( + long timeout, TimeUnit unit, Class resultClass) { + return getResultAsync(timeout, unit, resultClass, null); + } + + @Override + public CompletableFuture getResultAsync( + long timeout, TimeUnit unit, Class resultClass, @Nullable Type resultType) { + long deadlineNanos = System.nanoTime() + unit.toNanos(timeout); + return pollAsyncUntilCompletedOrDeadline(deadlineNanos) + .thenApply(out -> extractResult(out, resultClass, resultType)); + } + + @Override + public R getResult() { + if (resultClass == null) { + throw new IllegalStateException( + "getResult() requires a result type binding — wrap this handle with NexusClientHandle.fromUntyped"); + } + return getResult(resultClass, resultType); + } + + @Override + public R getResult(long timeout, TimeUnit unit) throws TimeoutException { + if (resultClass == null) { + throw new IllegalStateException( + "getResult() requires a result type binding — wrap this handle with NexusClientHandle.fromUntyped"); + } + return getResult(timeout, unit, resultClass, resultType); + } + + @Override + public CompletableFuture getResultAsync() { + if (resultClass == null) { + throw new IllegalStateException( + "getResultAsync() requires a result type binding — wrap this handle with NexusClientHandle.fromUntyped"); + } + return getResultAsync(resultClass, resultType); + } + + @Override + public CompletableFuture getResultAsync(long timeout, TimeUnit unit) { + if (resultClass == null) { + throw new IllegalStateException( + "getResultAsync() requires a result type binding — wrap this handle with NexusClientHandle.fromUntyped"); + } + return getResultAsync(timeout, unit, resultClass, resultType); + } + + /** Long-poll loop: re-poll if the server returns before the operation completes. */ + private PollNexusOperationExecutionOutput pollUntilCompleted() { + while (true) { + PollNexusOperationExecutionOutput out = + interceptor.pollNexusOperationExecution(buildPollInput()); + if (out.getWaitStage() == NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED) { + return out; + } + } + } + + /** Async long-poll loop using {@code thenCompose} to recurse without blocking a thread. */ + private CompletableFuture pollAsyncUntilCompleted() { + return interceptor + .pollNexusOperationExecutionAsync(buildPollInput()) + .thenCompose( + out -> { + if (out.getWaitStage() == NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED) { + return CompletableFuture.completedFuture(out); + } + return pollAsyncUntilCompleted(); + }); + } + + /** Sync poll loop bounded by an absolute nanos deadline. */ + private PollNexusOperationExecutionOutput pollSyncUntilCompletedOrDeadline(long deadlineNanos) + throws TimeoutException { + while (true) { + long remainingNanos = deadlineNanos - System.nanoTime(); + if (remainingNanos <= 0) { + throw new TimeoutException("getResult timed out before the operation completed"); + } + long pollDeadlineNanos = + Math.min(remainingNanos, TimeUnit.SECONDS.toNanos(POLL_DEADLINE_SECONDS)); + PollNexusOperationExecutionInput pollInput = + new PollNexusOperationExecutionInput( + operationId, + runId, + NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED, + Deadline.after(pollDeadlineNanos, TimeUnit.NANOSECONDS)); + PollNexusOperationExecutionOutput out; + try { + out = interceptor.pollNexusOperationExecution(pollInput); + } catch (RuntimeException e) { + if (System.nanoTime() >= deadlineNanos) { + TimeoutException timeout = + new TimeoutException("getResult timed out before the operation completed"); + timeout.initCause(e); + throw timeout; + } + throw e; + } + if (out.getWaitStage() == NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED) { + return out; + } + } + } + + /** Async poll loop bounded by an absolute nanos deadline. */ + private CompletableFuture pollAsyncUntilCompletedOrDeadline( + long deadlineNanos) { + long remainingNanos = deadlineNanos - System.nanoTime(); + if (remainingNanos <= 0) { + CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally( + new TimeoutException("getResultAsync timed out before the operation completed")); + return failed; + } + long pollDeadlineNanos = + Math.min(remainingNanos, TimeUnit.SECONDS.toNanos(POLL_DEADLINE_SECONDS)); + PollNexusOperationExecutionInput pollInput = + new PollNexusOperationExecutionInput( + operationId, + runId, + NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED, + Deadline.after(pollDeadlineNanos, TimeUnit.NANOSECONDS)); + return interceptor + .pollNexusOperationExecutionAsync(pollInput) + .thenCompose( + out -> { + if (out.getWaitStage() == NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED) { + return CompletableFuture.completedFuture(out); + } + return pollAsyncUntilCompletedOrDeadline(deadlineNanos); + }); + } + + private PollNexusOperationExecutionInput buildPollInput() { + return new PollNexusOperationExecutionInput( + operationId, + runId, + NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED, + Deadline.after(POLL_DEADLINE_SECONDS, TimeUnit.SECONDS)); + } + + /** + * Convert a completed poll response into the typed result, throwing the operation's failure as an + * exception if it failed. + */ + private X extractResult( + PollNexusOperationExecutionOutput out, Class resultClass, @Nullable Type resultType) { + Optional failure = out.getFailure(); + if (failure.isPresent()) { + throw dataConverter.failureToException(failure.get()); + } + Optional payload = out.getResult(); + if (!payload.isPresent()) { + return null; + } + return dataConverter.fromPayload( + payload.get(), resultClass, resultType != null ? resultType : resultClass); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusClientImpl.java b/temporal-sdk/src/main/java/io/temporal/client/NexusClientImpl.java new file mode 100644 index 000000000..22a6efd0f --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusClientImpl.java @@ -0,0 +1,149 @@ +package io.temporal.client; + +import static io.temporal.internal.WorkflowThreadMarker.enforceNonWorkflowThread; + +import com.uber.m3.tally.Scope; +import io.temporal.common.Experimental; +import io.temporal.common.interceptors.NexusClientCallsInterceptor; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.StartNexusOperationExecutionInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.StartNexusOperationExecutionOutput; +import io.temporal.common.interceptors.NexusClientInterceptor; +import io.temporal.internal.WorkflowThreadMarker; +import io.temporal.internal.client.NamespaceInjectWorkflowServiceStubs; +import io.temporal.internal.client.RootNexusClientInvoker; +import io.temporal.internal.client.external.GenericWorkflowClient; +import io.temporal.internal.client.external.GenericWorkflowClientImpl; +import io.temporal.serviceclient.MetricsTag; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.List; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Experimental +public class NexusClientImpl implements NexusClient { + + private static final Logger log = LoggerFactory.getLogger(NexusClientImpl.class); + + private final WorkflowServiceStubs workflowServiceStubs; + private final NexusClientOperationOptions options; + private final GenericWorkflowClient genericClient; + private final Scope metricsScope; + private final NexusClientCallsInterceptor nexusClientCallsInvoker; + private final List interceptors; + + public static NexusClient newInstance( + WorkflowServiceStubs service, NexusClientOperationOptions options) { + enforceNonWorkflowThread(); + return WorkflowThreadMarker.protectFromWorkflowThread( + new NexusClientImpl(service, options), NexusClient.class); + } + + NexusClientImpl(WorkflowServiceStubs workflowServiceStubs, NexusClientOperationOptions options) { + workflowServiceStubs = + new NamespaceInjectWorkflowServiceStubs(workflowServiceStubs, options.getNamespace()); + this.workflowServiceStubs = workflowServiceStubs; + this.options = options; + this.metricsScope = + workflowServiceStubs + .getOptions() + .getMetricsScope() + .tagged(MetricsTag.defaultTags(options.getNamespace())); + this.genericClient = new GenericWorkflowClientImpl(workflowServiceStubs, metricsScope); + this.interceptors = options.getInterceptors(); + this.nexusClientCallsInvoker = initializeClientInvoker(); + if (log.isDebugEnabled()) { + log.debug( + "NexusClient initialized: namespace={}, interceptors={}", + options.getNamespace(), + interceptors.size()); + } + } + + private NexusClientCallsInterceptor initializeClientInvoker() { + NexusClientCallsInterceptor invoker = new RootNexusClientInvoker(genericClient, options); + for (NexusClientInterceptor clientInterceptor : interceptors) { + NexusClientCallsInterceptor wrapped = clientInterceptor.nexusClientCallsInterceptor(invoker); + if (wrapped == null) { + throw new IllegalStateException( + "NexusClientInterceptor " + + clientInterceptor.getClass().getName() + + " returned null from nexusClientCallsInterceptor; expected a non-null" + + " NexusClientCallsInterceptor wrapping the supplied next link"); + } + invoker = wrapped; + } + return invoker; + } + + @Override + public WorkflowServiceStubs getWorkflowServiceStubs() { + return workflowServiceStubs; + } + + @Override + public UntypedNexusClientHandle getHandle(String operationId) { + return getHandle(operationId, null); + } + + @Override + public UntypedNexusClientHandle getHandle(String operationId, @Nullable String runId) { + return new NexusClientHandleImpl<>( + nexusClientCallsInvoker, operationId, runId, options.getDataConverter()); + } + + @Override + public NexusClientHandle getHandle( + String operationId, @Nullable String runId, Class resultClass) { + return getHandle(operationId, runId, resultClass, null); + } + + @Override + public NexusClientHandle getHandle( + String operationId, + @Nullable String runId, + Class resultClass, + @Nullable java.lang.reflect.Type resultType) { + return new NexusClientHandleImpl<>( + nexusClientCallsInvoker, + operationId, + runId, + options.getDataConverter(), + resultClass, + resultType); + } + + @Override + public UntypedNexusServiceClient newUntypedNexusServiceClient( + String endpoint, String serviceName) { + return new UntypedNexusServiceClientImpl(this, endpoint, serviceName, options); + } + + @Override + public NexusServiceClient newNexusServiceClient( + Class serviceInterface, String endpoint) { + return new NexusServiceClientImpl<>(this, serviceInterface, endpoint, options); + } + + @Override + public StartNexusOperationExecutionOutput startNexusOperationExecution( + StartNexusOperationExecutionInput input) { + return nexusClientCallsInvoker.startNexusOperationExecution(input); + } + + @Override + public ListNexusOperationExecutionsOutput listNexusOperationExecutions( + ListNexusOperationExecutionsInput input) { + return nexusClientCallsInvoker.listNexusOperationExecutions(input); + } + + @Override + public CountNexusOperationExecutionsOutput countNexusOperationExecutions( + CountNexusOperationExecutionsInput input) { + return nexusClientCallsInvoker.countNexusOperationExecutions(input); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusClientOperationExecutionDescription.java b/temporal-sdk/src/main/java/io/temporal/client/NexusClientOperationExecutionDescription.java new file mode 100644 index 000000000..fa1ec0ca4 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusClientOperationExecutionDescription.java @@ -0,0 +1,26 @@ +package io.temporal.client; + +import io.temporal.api.workflowservice.v1.DescribeNexusOperationExecutionResponse; +import io.temporal.common.Experimental; + +/** Snapshot of a standalone Nexus operation execution returned by describe/poll calls. */ +@Experimental +public final class NexusClientOperationExecutionDescription { + + private final DescribeNexusOperationExecutionResponse response; + + public NexusClientOperationExecutionDescription( + DescribeNexusOperationExecutionResponse response) { + this.response = response; + } + + /** Run ID of the operation described. */ + public String getRunId() { + return response.getRunId(); + } + + /** Underlying proto response. Exposed while the Nexus SDK surface is still experimental. */ + public DescribeNexusOperationExecutionResponse getRawResponse() { + return response; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusClientOperationOptions.java b/temporal-sdk/src/main/java/io/temporal/client/NexusClientOperationOptions.java new file mode 100644 index 000000000..0cf2d92a9 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusClientOperationOptions.java @@ -0,0 +1,197 @@ +package io.temporal.client; + +import io.temporal.api.enums.v1.NexusOperationIdConflictPolicy; +import io.temporal.api.enums.v1.NexusOperationIdReusePolicy; +import io.temporal.common.SearchAttributes; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.converter.GlobalDataConverter; +import io.temporal.common.interceptors.NexusClientInterceptor; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; + +public class NexusClientOperationOptions { + + private final String namespace; + private final List interceptors; + private final DataConverter dataConverter; + private final @Nullable SearchAttributes searchAttributes; + private final @Nullable String summary; + private final @Nullable NexusOperationIdReusePolicy idReusePolicy; + private final @Nullable NexusOperationIdConflictPolicy idConflictPolicy; + + private NexusClientOperationOptions( + String namespace, + List interceptors, + DataConverter dataConverter, + @Nullable SearchAttributes searchAttributes, + @Nullable String summary, + @Nullable NexusOperationIdReusePolicy idReusePolicy, + @Nullable NexusOperationIdConflictPolicy idConflictPolicy) { + this.namespace = namespace; + this.interceptors = interceptors; + this.dataConverter = dataConverter; + this.searchAttributes = searchAttributes; + this.summary = summary; + this.idReusePolicy = idReusePolicy; + this.idConflictPolicy = idConflictPolicy; + } + + /** Get the namespace this client will operate on. */ + public String getNamespace() { + return namespace; + } + + /** Get the interceptors of this client. */ + public List getInterceptors() { + return interceptors; + } + + /** Get the data converter used to serialize Nexus operation inputs and deserialize results. */ + public DataConverter getDataConverter() { + return dataConverter; + } + + /** + * Default search attributes attached to operations started through this client. May be {@code + * null}. + * + *

Encoded to the proto representation and forwarded into every {@code + * StartNexusOperationExecution} request issued through this client. + */ + public @Nullable SearchAttributes getSearchAttributes() { + return searchAttributes; + } + + /** + * Default operation summary attached to operations started through this client. May be {@code + * null}. + */ + public @Nullable String getSummary() { + return summary; + } + + /** + * Default operation-id reuse policy applied when starting operations through this client. May be + * {@code null} (server default applies). + */ + public @Nullable NexusOperationIdReusePolicy getIdReusePolicy() { + return idReusePolicy; + } + + /** + * Default operation-id conflict policy applied when starting operations through this client. May + * be {@code null} (server default applies). + */ + public @Nullable NexusOperationIdConflictPolicy getIdConflictPolicy() { + return idConflictPolicy; + } + + public static NexusClientOperationOptions.Builder newBuilder() { + return new NexusClientOperationOptions.Builder(); + } + + public static NexusClientOperationOptions.Builder newBuilder( + NexusClientOperationOptions options) { + return new NexusClientOperationOptions.Builder(options); + } + + private static final NexusClientOperationOptions DEFAULT_INSTANCE; + + public static NexusClientOperationOptions getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + static { + DEFAULT_INSTANCE = NexusClientOperationOptions.newBuilder().build(); + } + + public static class Builder { + private String namespace; + private List interceptors = Collections.emptyList(); + private DataConverter dataConverter = GlobalDataConverter.get(); + private @Nullable SearchAttributes searchAttributes; + private @Nullable String summary; + private @Nullable NexusOperationIdReusePolicy idReusePolicy; + private @Nullable NexusOperationIdConflictPolicy idConflictPolicy; + + private Builder() {} + + private Builder(NexusClientOperationOptions options) { + if (options == null) { + return; + } + namespace = options.namespace; + interceptors = options.interceptors; + dataConverter = options.dataConverter; + searchAttributes = options.searchAttributes; + summary = options.summary; + idReusePolicy = options.idReusePolicy; + idConflictPolicy = options.idConflictPolicy; + } + + /** Set the namespace this client will operate on. */ + public NexusClientOperationOptions.Builder setNamespace(String namespace) { + this.namespace = namespace; + return this; + } + + /** Set the interceptors for this client, but don't allow null lists to happen. */ + public NexusClientOperationOptions.Builder setInterceptors( + List interceptors) { + if (interceptors == null) { + this.interceptors = Collections.emptyList(); + } else { + this.interceptors = interceptors; + } + return this; + } + + /** + * Set the data converter used to serialize Nexus operation inputs and deserialize results. + * Defaults to {@link GlobalDataConverter#get()}. + */ + public NexusClientOperationOptions.Builder setDataConverter(DataConverter dataConverter) { + this.dataConverter = dataConverter; + return this; + } + + /** Set default search attributes attached to operations started through this client. */ + public NexusClientOperationOptions.Builder setSearchAttributes( + @Nullable SearchAttributes searchAttributes) { + this.searchAttributes = searchAttributes; + return this; + } + + /** Set the default operation summary attached to operations started through this client. */ + public NexusClientOperationOptions.Builder setSummary(@Nullable String summary) { + this.summary = summary; + return this; + } + + /** Set the default operation-id reuse policy. */ + public NexusClientOperationOptions.Builder setIdReusePolicy( + @Nullable NexusOperationIdReusePolicy idReusePolicy) { + this.idReusePolicy = idReusePolicy; + return this; + } + + /** Set the default operation-id conflict policy. */ + public NexusClientOperationOptions.Builder setIdConflictPolicy( + @Nullable NexusOperationIdConflictPolicy idConflictPolicy) { + this.idConflictPolicy = idConflictPolicy; + return this; + } + + public NexusClientOperationOptions build() { + return new NexusClientOperationOptions( + namespace, + interceptors, + dataConverter, + searchAttributes, + summary, + idReusePolicy, + idConflictPolicy); + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusServiceClient.java b/temporal-sdk/src/main/java/io/temporal/client/NexusServiceClient.java new file mode 100644 index 000000000..909c1e604 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusServiceClient.java @@ -0,0 +1,68 @@ +package io.temporal.client; + +import io.temporal.common.Experimental; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.workflow.NexusOperationOptions; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; + +/** + * Typed client for invoking standalone Nexus operations on a specific service interface {@code T}. + * + *

Operations are dispatched via method references (or {@link BiFunction} lambdas) that target + * methods on {@code T}; the client extracts the operation name from the invocation and delegates to + * {@link NexusClient}. {@code listNexusOperationExecutions} and {@code + * countNexusOperationExecutions} are automatically scoped to operations of service {@code T}. + */ +@Experimental +public interface NexusServiceClient extends UntypedNexusServiceClient { + + static NexusServiceClient newInstance( + Class service, String endpoint, WorkflowServiceStubs stubs) { + return newInstance(service, endpoint, stubs, NexusClientOperationOptions.getDefaultInstance()); + } + + static NexusServiceClient newInstance( + Class service, + String endpoint, + WorkflowServiceStubs stubs, + NexusClientOperationOptions options) { + return NexusClient.newInstance(stubs, options).newNexusServiceClient(service, endpoint); + } + + /** + * Execute an operation synchronously. Equivalent to {@link #start(BiFunction, Object)} followed + * by {@link NexusClientHandle#getResult()}. + */ + R execute(BiFunction operation, U input); + + /** Execute an operation synchronously with per-call options. */ + R execute(BiFunction operation, U input, NexusOperationOptions options); + + /** Start an operation and return a typed handle to track its execution. */ + NexusClientHandle start(BiFunction operation, U input); + + /** Start an operation with per-call options and return a typed handle. */ + NexusClientHandle start( + BiFunction operation, U input, NexusOperationOptions options); + + /** + * Async variant of {@link #execute(BiFunction, Object)}. Returns a {@link CompletableFuture} that + * completes with the typed result, or completes exceptionally if the operation fails. + */ + CompletableFuture executeAsync(BiFunction operation, U input); + + /** Async variant of {@link #execute(BiFunction, Object, NexusOperationOptions)}. */ + CompletableFuture executeAsync( + BiFunction operation, U input, NexusOperationOptions options); + + /** + * Async variant of {@link #start(BiFunction, Object)}. Returns a {@link CompletableFuture} that + * completes with the typed handle once the start RPC has acknowledged the operation. + */ + CompletableFuture> startAsync(BiFunction operation, U input); + + /** Async variant of {@link #start(BiFunction, Object, NexusOperationOptions)}. */ + CompletableFuture> startAsync( + BiFunction operation, U input, NexusOperationOptions options); +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusServiceClientImpl.java b/temporal-sdk/src/main/java/io/temporal/client/NexusServiceClientImpl.java new file mode 100644 index 000000000..685b08e37 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusServiceClientImpl.java @@ -0,0 +1,124 @@ +package io.temporal.client; + +import com.google.common.base.Defaults; +import io.nexusrpc.Operation; +import io.nexusrpc.ServiceDefinition; +import io.temporal.common.Experimental; +import io.temporal.workflow.NexusOperationOptions; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.Type; +import java.util.function.BiFunction; + +/** + * Typed Nexus service client. Extracts the operation name from a {@link BiFunction} that targets a + * method on the service interface (via a {@link Proxy} of {@code T}) and delegates to {@link + * NexusClient}. List/count are automatically scoped to operations of service {@code T} via a {@code + * Service="..."} visibility filter. + */ +@Experimental +class NexusServiceClientImpl extends UntypedNexusServiceClientImpl + implements NexusServiceClient { + + private final Class serviceInterface; + + NexusServiceClientImpl( + NexusClient client, + Class serviceInterface, + String endpoint, + NexusClientOperationOptions options) { + super(client, endpoint, ServiceDefinition.fromClass(serviceInterface).getName(), options); + this.serviceInterface = serviceInterface; + } + + @Override + public R execute(BiFunction operation, U input) { + return execute(operation, input, NexusOperationOptions.getDefaultInstance()); + } + + @Override + public R execute(BiFunction operation, U input, NexusOperationOptions options) { + return start(operation, input, options).getResult(); + } + + @Override + public NexusClientHandle start(BiFunction operation, U input) { + return start(operation, input, NexusOperationOptions.getDefaultInstance()); + } + + @Override + public NexusClientHandle start( + BiFunction operation, U input, NexusOperationOptions options) { + OperationCapture capture = captureOperation(operation, input); + UntypedNexusClientHandle untyped = start(capture.operationName, options, input); + return NexusClientHandle.fromUntyped(untyped, capture.resultClass, capture.resultType); + } + + @Override + public java.util.concurrent.CompletableFuture executeAsync( + BiFunction operation, U input) { + return executeAsync(operation, input, NexusOperationOptions.getDefaultInstance()); + } + + @Override + public java.util.concurrent.CompletableFuture executeAsync( + BiFunction operation, U input, NexusOperationOptions options) { + return startAsync(operation, input, options).thenCompose(NexusClientHandle::getResultAsync); + } + + @Override + public java.util.concurrent.CompletableFuture> startAsync( + BiFunction operation, U input) { + return startAsync(operation, input, NexusOperationOptions.getDefaultInstance()); + } + + @Override + public java.util.concurrent.CompletableFuture> startAsync( + BiFunction operation, U input, NexusOperationOptions options) { + // The underlying start RPC is sync; wrap on the common pool. A truly non-blocking start would + // require an async startNexusOperationExecution variant on the calls interceptor. + return java.util.concurrent.CompletableFuture.supplyAsync( + () -> start(operation, input, options)); + } + + /** Records the operation method invoked on the service proxy. */ + private static final class OperationCapture { + String operationName; + + @SuppressWarnings("rawtypes") + Class resultClass; + + Type resultType; + } + + @SuppressWarnings({"unchecked", "ReturnValueIgnored"}) + private OperationCapture captureOperation(BiFunction operation, U input) { + OperationCapture capture = new OperationCapture<>(); + InvocationHandler handler = + (Object proxy, Method method, Object[] args) -> { + if (Object.class.equals(method.getDeclaringClass())) { + return Defaults.defaultValue(method.getReturnType()); + } + Operation opAnnotation = method.getAnnotation(Operation.class); + capture.operationName = + opAnnotation != null && !opAnnotation.name().isEmpty() + ? opAnnotation.name() + : method.getName(); + capture.resultClass = method.getReturnType(); + capture.resultType = method.getGenericReturnType(); + return Defaults.defaultValue(method.getReturnType()); + }; + T proxy = + (T) + Proxy.newProxyInstance( + serviceInterface.getClassLoader(), new Class[] {serviceInterface}, handler); + operation.apply(proxy, input); + if (capture.operationName == null) { + throw new IllegalArgumentException( + "Could not extract Nexus operation name; the BiFunction must invoke a method on the" + + " service proxy (e.g. ServiceInterface::operationMethod)"); + } + return capture; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusClientHandle.java b/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusClientHandle.java new file mode 100644 index 000000000..0054e28a7 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusClientHandle.java @@ -0,0 +1,58 @@ +package io.temporal.client; + +import java.lang.reflect.Type; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; + +public interface UntypedNexusClientHandle { + /** Operation ID this handle was constructed for. Always non-null. */ + String getNexusOperationId(); + + /** + * Present if the handle was returned by `start` or set when calling `getHandle`. Null if + * `getHandle` was called with a null run ID — in that case, use {@link #describe()} to learn the + * current run ID. + */ + @Nullable + String getNexusOperationRunId(); + + R getResult(Class resultClass); + + R getResult(Class resultClass, @Nullable Type resultType); + + /** + * Block up to {@code timeout} for the operation to complete and return the typed result. Throws + * {@link TimeoutException} if the operation has not completed within the deadline. + */ + R getResult(long timeout, TimeUnit unit, Class resultClass) throws TimeoutException; + + R getResult(long timeout, TimeUnit unit, Class resultClass, @Nullable Type resultType) + throws TimeoutException; + + CompletableFuture getResultAsync(Class resultClass); + + CompletableFuture getResultAsync(Class resultClass, @Nullable Type resultType); + + /** + * Returns a future that completes with the typed result, or completes exceptionally with a {@link + * TimeoutException} if {@code timeout} elapses before the operation finishes. + */ + CompletableFuture getResultAsync(long timeout, TimeUnit unit, Class resultClass); + + CompletableFuture getResultAsync( + long timeout, TimeUnit unit, Class resultClass, @Nullable Type resultType); + + NexusClientOperationExecutionDescription describe(); + + void cancel(); + + void cancel(@Nullable String reason); + + void terminate(); + + void terminate(@Nullable String reason); + + void delete(); +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusServiceClient.java b/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusServiceClient.java new file mode 100644 index 000000000..493c559fe --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusServiceClient.java @@ -0,0 +1,45 @@ +package io.temporal.client; + +import io.temporal.common.Experimental; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsOutput; +import io.temporal.workflow.NexusOperationOptions; +import java.lang.reflect.Type; +import javax.annotation.Nullable; + +/** Untyped client for invoking standalone Nexus operations by operation-name string. */ +@Experimental +public interface UntypedNexusServiceClient { + + /** Start an operation by name, returning an untyped handle. */ + UntypedNexusClientHandle start( + String operation, NexusOperationOptions options, @Nullable Object arg); + + /** Execute an operation synchronously by name. */ + R execute( + String operation, Class resultClass, NexusOperationOptions options, @Nullable Object arg); + + /** Execute an operation synchronously by name with explicit generic-result {@link Type}. */ + R execute( + String operation, + Class resultClass, + Type resultType, + NexusOperationOptions options, + @Nullable Object arg); + + /** + * List operation executions whose service matches the service this client targets. Any + * user-supplied query is ANDed with a {@code Service="..."} filter. + */ + ListNexusOperationExecutionsOutput listNexusOperationExecutions( + ListNexusOperationExecutionsInput input); + + /** + * Count operation executions whose service matches the service this client targets. Any + * user-supplied query is ANDed with a {@code Service="..."} filter. + */ + CountNexusOperationExecutionsOutput countNexusOperationExecutions( + CountNexusOperationExecutionsInput input); +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusServiceClientImpl.java b/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusServiceClientImpl.java new file mode 100644 index 000000000..45bb8262c --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusServiceClientImpl.java @@ -0,0 +1,130 @@ +package io.temporal.client; + +import io.temporal.api.common.v1.Payload; +import io.temporal.api.common.v1.SearchAttributes; +import io.temporal.common.Experimental; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.StartNexusOperationExecutionInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.StartNexusOperationExecutionOutput; +import io.temporal.internal.common.SearchAttributesUtil; +import io.temporal.workflow.NexusOperationOptions; +import java.lang.reflect.Type; +import java.util.UUID; +import javax.annotation.Nullable; + +/** + * Untyped Nexus service client. Holds the {@link NexusClient}, target endpoint, service name, and + * data converter, and translates operation-name calls into {@link + * NexusClient#startNexusOperationExecution} invocations. + */ +@Experimental +class UntypedNexusServiceClientImpl implements UntypedNexusServiceClient { + + private final NexusClient client; + private final String endpoint; + private final String serviceName; + private final DataConverter dataConverter; + private final NexusClientOperationOptions clientOptions; + + UntypedNexusServiceClientImpl( + NexusClient client, + String endpoint, + String serviceName, + NexusClientOperationOptions clientOptions) { + if (client == null || endpoint == null || serviceName == null || clientOptions == null) { + throw new IllegalArgumentException( + "client, endpoint, serviceName, and clientOptions are all required"); + } + this.client = client; + this.endpoint = endpoint; + this.serviceName = serviceName; + this.dataConverter = clientOptions.getDataConverter(); + this.clientOptions = clientOptions; + } + + @Override + public UntypedNexusClientHandle start( + String operation, NexusOperationOptions options, @Nullable Object arg) { + Payload payload = serializeInput(arg); + String operationId = UUID.randomUUID().toString(); + @Nullable + SearchAttributes searchAttributes = + SearchAttributesUtil.encodeTyped(clientOptions.getSearchAttributes()); + StartNexusOperationExecutionInput input = + new StartNexusOperationExecutionInput( + operationId, + endpoint, + serviceName, + operation, + options == null ? null : options.getScheduleToCloseTimeout(), + options == null ? null : options.getScheduleToStartTimeout(), + options == null ? null : options.getStartToCloseTimeout(), + payload, + searchAttributes, + /* nexusHeader= */ null, + options == null ? clientOptions.getSummary() : options.getSummary(), + clientOptions.getIdReusePolicy(), + clientOptions.getIdConflictPolicy()); + StartNexusOperationExecutionOutput output = client.startNexusOperationExecution(input); + return client.getHandle(operationId, output.getRunId()); + } + + @Override + public R execute( + String operation, Class resultClass, NexusOperationOptions options, @Nullable Object arg) { + return execute(operation, resultClass, /* resultType= */ null, options, arg); + } + + @Override + public R execute( + String operation, + Class resultClass, + @Nullable Type resultType, + NexusOperationOptions options, + @Nullable Object arg) { + UntypedNexusClientHandle handle = start(operation, options, arg); + return NexusClientHandle.fromUntyped(handle, resultClass, resultType).getResult(); + } + + @Override + public ListNexusOperationExecutionsOutput listNexusOperationExecutions( + ListNexusOperationExecutionsInput input) { + String scopedQuery = scopeQuery(input.getQuery().orElse(null)); + return client.listNexusOperationExecutions( + new ListNexusOperationExecutionsInput( + scopedQuery, input.getPageSize(), input.getNextPageToken().orElse(null))); + } + + @Override + public CountNexusOperationExecutionsOutput countNexusOperationExecutions( + CountNexusOperationExecutionsInput input) { + String scopedQuery = scopeQuery(input.getQuery().orElse(null)); + return client.countNexusOperationExecutions( + new CountNexusOperationExecutionsInput(scopedQuery)); + } + + private String scopeQuery(@Nullable String userQuery) { + String serviceFilter = "Service=\"" + serviceName + "\""; + if (userQuery == null || userQuery.isEmpty()) { + return serviceFilter; + } + return serviceFilter + " AND (" + userQuery + ")"; + } + + private @Nullable Payload serializeInput(@Nullable Object arg) { + if (arg == null) { + return null; + } + Class argClass = arg.getClass(); + return dataConverter + .toPayload(arg) + .orElseThrow( + () -> + new IllegalStateException( + "DataConverter returned no payload for input of type " + argClass.getName())); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientCallsInterceptor.java new file mode 100644 index 000000000..7212bbe1a --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientCallsInterceptor.java @@ -0,0 +1,496 @@ +package io.temporal.common.interceptors; + +import com.google.protobuf.ByteString; +import io.grpc.Deadline; +import io.temporal.api.common.v1.Payload; +import io.temporal.api.common.v1.SearchAttributes; +import io.temporal.api.enums.v1.NexusOperationIdConflictPolicy; +import io.temporal.api.enums.v1.NexusOperationIdReusePolicy; +import io.temporal.api.enums.v1.NexusOperationWaitStage; +import io.temporal.api.failure.v1.Failure; +import io.temporal.api.nexus.v1.NexusOperationExecutionListInfo; +import io.temporal.client.NexusClient; +import io.temporal.client.NexusClientHandle; +import io.temporal.client.NexusClientOperationExecutionDescription; +import io.temporal.common.Experimental; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Per-call interceptor for {@link NexusClient} and {@link NexusClientHandle} operations on + * standalone Nexus operation executions. + * + *

Implementations are produced by {@link + * NexusClientInterceptor#nexusClientCallsInterceptor(NexusClientCallsInterceptor)} during {@link + * NexusClient} construction. Prefer extending {@link NexusClientCallsInterceptorBase} and + * overriding only the methods you need. + */ +@Experimental +public interface NexusClientCallsInterceptor { + + StartNexusOperationExecutionOutput startNexusOperationExecution( + StartNexusOperationExecutionInput input); + + DescribeNexusOperationExecutionOutput describeNexusOperationExecution( + DescribeNexusOperationExecutionInput input); + + CompletableFuture describeNexusOperationExecutionAsync( + DescribeNexusOperationExecutionInput input); + + PollNexusOperationExecutionOutput pollNexusOperationExecution( + PollNexusOperationExecutionInput input); + + CompletableFuture pollNexusOperationExecutionAsync( + PollNexusOperationExecutionInput input); + + ListNexusOperationExecutionsOutput listNexusOperationExecutions( + ListNexusOperationExecutionsInput input); + + CountNexusOperationExecutionsOutput countNexusOperationExecutions( + CountNexusOperationExecutionsInput input); + + void requestCancelNexusOperationExecution(RequestCancelNexusOperationExecutionInput input); + + void terminateNexusOperationExecution(TerminateNexusOperationExecutionInput input); + + void deleteNexusOperationExecution(DeleteNexusOperationExecutionInput input); + + final class StartNexusOperationExecutionInput { + private final String operationId; + private final String endpoint; + private final String service; + private final String operation; + private final @Nullable Duration scheduleToCloseTimeout; + private final @Nullable Duration scheduleToStartTimeout; + private final @Nullable Duration startToCloseTimeout; + private final @Nullable Payload input; + private final @Nullable SearchAttributes searchAttributes; + private final Map nexusHeader; + private final @Nullable String summary; + private final @Nullable NexusOperationIdReusePolicy idReusePolicy; + private final @Nullable NexusOperationIdConflictPolicy idConflictPolicy; + + /** + * Legacy constructor without per-call timeout overloads or summary; delegates to the primary. + */ + public StartNexusOperationExecutionInput( + String operationId, + String endpoint, + String service, + String operation, + @Nullable Duration scheduleToCloseTimeout, + @Nullable Payload input, + @Nullable SearchAttributes searchAttributes, + @Nullable Map nexusHeader) { + this( + operationId, + endpoint, + service, + operation, + scheduleToCloseTimeout, + /* scheduleToStartTimeout= */ null, + /* startToCloseTimeout= */ null, + input, + searchAttributes, + nexusHeader, + /* summary= */ null, + /* idReusePolicy= */ null, + /* idConflictPolicy= */ null); + } + + public StartNexusOperationExecutionInput( + String operationId, + String endpoint, + String service, + String operation, + @Nullable Duration scheduleToCloseTimeout, + @Nullable Duration scheduleToStartTimeout, + @Nullable Duration startToCloseTimeout, + @Nullable Payload input, + @Nullable SearchAttributes searchAttributes, + @Nullable Map nexusHeader, + @Nullable String summary, + @Nullable NexusOperationIdReusePolicy idReusePolicy, + @Nullable NexusOperationIdConflictPolicy idConflictPolicy) { + this.operationId = operationId; + this.endpoint = endpoint; + this.service = service; + this.operation = operation; + this.scheduleToCloseTimeout = scheduleToCloseTimeout; + this.scheduleToStartTimeout = scheduleToStartTimeout; + this.startToCloseTimeout = startToCloseTimeout; + this.input = input; + this.searchAttributes = searchAttributes; + this.nexusHeader = + nexusHeader == null ? Collections.emptyMap() : Collections.unmodifiableMap(nexusHeader); + this.summary = summary; + this.idReusePolicy = idReusePolicy; + this.idConflictPolicy = idConflictPolicy; + } + + public String getOperationId() { + return operationId; + } + + public String getEndpoint() { + return endpoint; + } + + public String getService() { + return service; + } + + public String getOperation() { + return operation; + } + + public Optional getScheduleToCloseTimeout() { + return Optional.ofNullable(scheduleToCloseTimeout); + } + + public Optional getScheduleToStartTimeout() { + return Optional.ofNullable(scheduleToStartTimeout); + } + + public Optional getStartToCloseTimeout() { + return Optional.ofNullable(startToCloseTimeout); + } + + public Optional getInput() { + return Optional.ofNullable(input); + } + + public Optional getSearchAttributes() { + return Optional.ofNullable(searchAttributes); + } + + public Map getNexusHeader() { + return nexusHeader; + } + + public Optional getSummary() { + return Optional.ofNullable(summary); + } + + public Optional getIdReusePolicy() { + return Optional.ofNullable(idReusePolicy); + } + + public Optional getIdConflictPolicy() { + return Optional.ofNullable(idConflictPolicy); + } + } + + final class StartNexusOperationExecutionOutput { + private final String runId; + private final boolean started; + + public StartNexusOperationExecutionOutput(String runId, boolean started) { + this.runId = runId; + this.started = started; + } + + public String getRunId() { + return runId; + } + + public boolean isStarted() { + return started; + } + } + + final class DescribeNexusOperationExecutionInput { + private final String operationId; + private final @Nullable String runId; + private final boolean includeInput; + private final boolean includeOutcome; + private final @Nonnull Deadline deadline; + + public DescribeNexusOperationExecutionInput( + String operationId, + @Nullable String runId, + boolean includeInput, + boolean includeOutcome, + @Nonnull Deadline deadline) { + this.operationId = operationId; + this.runId = runId; + this.includeInput = includeInput; + this.includeOutcome = includeOutcome; + this.deadline = deadline; + } + + public String getOperationId() { + return operationId; + } + + public Optional getRunId() { + return Optional.ofNullable(runId); + } + + public boolean isIncludeInput() { + return includeInput; + } + + public boolean isIncludeOutcome() { + return includeOutcome; + } + + public Deadline getDeadline() { + return deadline; + } + } + + final class DescribeNexusOperationExecutionOutput { + private final NexusClientOperationExecutionDescription description; + + public DescribeNexusOperationExecutionOutput( + NexusClientOperationExecutionDescription description) { + this.description = description; + } + + public NexusClientOperationExecutionDescription getDescription() { + return description; + } + } + + final class PollNexusOperationExecutionInput { + private final String operationId; + private final @Nullable String runId; + private final NexusOperationWaitStage waitStage; + private final @Nonnull Deadline deadline; + + public PollNexusOperationExecutionInput( + String operationId, + @Nullable String runId, + NexusOperationWaitStage waitStage, + @Nonnull Deadline deadline) { + this.operationId = operationId; + this.runId = runId; + this.waitStage = waitStage; + this.deadline = deadline; + } + + public String getOperationId() { + return operationId; + } + + public Optional getRunId() { + return Optional.ofNullable(runId); + } + + public NexusOperationWaitStage getWaitStage() { + return waitStage; + } + + public Deadline getDeadline() { + return deadline; + } + } + + final class PollNexusOperationExecutionOutput { + private final String runId; + private final NexusOperationWaitStage waitStage; + private final String operationToken; + private final @Nullable Payload result; + private final @Nullable Failure failure; + + public PollNexusOperationExecutionOutput( + String runId, + NexusOperationWaitStage waitStage, + String operationToken, + @Nullable Payload result, + @Nullable Failure failure) { + this.runId = runId; + this.waitStage = waitStage; + this.operationToken = operationToken; + this.result = result; + this.failure = failure; + } + + public String getRunId() { + return runId; + } + + public NexusOperationWaitStage getWaitStage() { + return waitStage; + } + + public String getOperationToken() { + return operationToken; + } + + public Optional getResult() { + return Optional.ofNullable(result); + } + + public Optional getFailure() { + return Optional.ofNullable(failure); + } + } + + final class ListNexusOperationExecutionsInput { + private final @Nullable String query; + private final int pageSize; + private final @Nullable ByteString nextPageToken; + + public ListNexusOperationExecutionsInput( + @Nullable String query, int pageSize, @Nullable ByteString nextPageToken) { + this.query = query; + this.pageSize = pageSize; + this.nextPageToken = nextPageToken; + } + + public Optional getQuery() { + return Optional.ofNullable(query); + } + + public int getPageSize() { + return pageSize; + } + + public Optional getNextPageToken() { + return Optional.ofNullable(nextPageToken); + } + } + + final class ListNexusOperationExecutionsOutput { + private final List operations; + private final ByteString nextPageToken; + + public ListNexusOperationExecutionsOutput( + List operations, ByteString nextPageToken) { + this.operations = Collections.unmodifiableList(operations); + this.nextPageToken = nextPageToken; + } + + public List getOperations() { + return operations; + } + + public ByteString getNextPageToken() { + return nextPageToken; + } + } + + final class CountNexusOperationExecutionsInput { + private final @Nullable String query; + + public CountNexusOperationExecutionsInput(@Nullable String query) { + this.query = query; + } + + public Optional getQuery() { + return Optional.ofNullable(query); + } + } + + final class CountNexusOperationExecutionsOutput { + private final long count; + private final List groups; + + public CountNexusOperationExecutionsOutput(long count, List groups) { + this.count = count; + this.groups = Collections.unmodifiableList(groups); + } + + public long getCount() { + return count; + } + + public List getGroups() { + return groups; + } + + public static final class AggregationGroup { + private final List groupValues; + private final long count; + + public AggregationGroup(List groupValues, long count) { + this.groupValues = Collections.unmodifiableList(groupValues); + this.count = count; + } + + public List getGroupValues() { + return groupValues; + } + + public long getCount() { + return count; + } + } + } + + final class RequestCancelNexusOperationExecutionInput { + private final String operationId; + private final @Nullable String runId; + private final @Nullable String reason; + + public RequestCancelNexusOperationExecutionInput( + String operationId, @Nullable String runId, @Nullable String reason) { + this.operationId = operationId; + this.runId = runId; + this.reason = reason; + } + + public String getOperationId() { + return operationId; + } + + public Optional getRunId() { + return Optional.ofNullable(runId); + } + + public Optional getReason() { + return Optional.ofNullable(reason); + } + } + + final class TerminateNexusOperationExecutionInput { + private final String operationId; + private final @Nullable String runId; + private final @Nullable String reason; + + public TerminateNexusOperationExecutionInput( + String operationId, @Nullable String runId, @Nullable String reason) { + this.operationId = operationId; + this.runId = runId; + this.reason = reason; + } + + public String getOperationId() { + return operationId; + } + + public Optional getRunId() { + return Optional.ofNullable(runId); + } + + public Optional getReason() { + return Optional.ofNullable(reason); + } + } + + final class DeleteNexusOperationExecutionInput { + private final String operationId; + private final @Nullable String runId; + + public DeleteNexusOperationExecutionInput(String operationId, @Nullable String runId) { + this.operationId = operationId; + this.runId = runId; + } + + public String getOperationId() { + return operationId; + } + + public Optional getRunId() { + return Optional.ofNullable(runId); + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientCallsInterceptorBase.java new file mode 100644 index 000000000..a2b13794f --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientCallsInterceptorBase.java @@ -0,0 +1,76 @@ +package io.temporal.common.interceptors; + +import io.temporal.common.Experimental; +import java.util.concurrent.CompletableFuture; + +/** + * Convenience base class for {@link NexusClientCallsInterceptor} implementations that need to + * override only a subset of methods. All methods delegate to the wrapped {@code next} interceptor. + */ +@Experimental +public class NexusClientCallsInterceptorBase implements NexusClientCallsInterceptor { + + private final NexusClientCallsInterceptor next; + + public NexusClientCallsInterceptorBase(NexusClientCallsInterceptor next) { + this.next = next; + } + + @Override + public StartNexusOperationExecutionOutput startNexusOperationExecution( + StartNexusOperationExecutionInput input) { + return next.startNexusOperationExecution(input); + } + + @Override + public DescribeNexusOperationExecutionOutput describeNexusOperationExecution( + DescribeNexusOperationExecutionInput input) { + return next.describeNexusOperationExecution(input); + } + + @Override + public CompletableFuture + describeNexusOperationExecutionAsync(DescribeNexusOperationExecutionInput input) { + return next.describeNexusOperationExecutionAsync(input); + } + + @Override + public PollNexusOperationExecutionOutput pollNexusOperationExecution( + PollNexusOperationExecutionInput input) { + return next.pollNexusOperationExecution(input); + } + + @Override + public CompletableFuture pollNexusOperationExecutionAsync( + PollNexusOperationExecutionInput input) { + return next.pollNexusOperationExecutionAsync(input); + } + + @Override + public ListNexusOperationExecutionsOutput listNexusOperationExecutions( + ListNexusOperationExecutionsInput input) { + return next.listNexusOperationExecutions(input); + } + + @Override + public CountNexusOperationExecutionsOutput countNexusOperationExecutions( + CountNexusOperationExecutionsInput input) { + return next.countNexusOperationExecutions(input); + } + + @Override + public void requestCancelNexusOperationExecution( + RequestCancelNexusOperationExecutionInput input) { + next.requestCancelNexusOperationExecution(input); + } + + @Override + public void terminateNexusOperationExecution(TerminateNexusOperationExecutionInput input) { + next.terminateNexusOperationExecution(input); + } + + @Override + public void deleteNexusOperationExecution(DeleteNexusOperationExecutionInput input) { + next.deleteNexusOperationExecution(input); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientInterceptor.java new file mode 100644 index 000000000..a2ce4945f --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientInterceptor.java @@ -0,0 +1,24 @@ +package io.temporal.common.interceptors; + +import io.temporal.client.NexusClient; +import io.temporal.client.NexusClientOperationOptions; +import io.temporal.common.Experimental; + +/** + * Outer interceptor for {@link NexusClient}. Implementations are registered via {@link + * NexusClientOperationOptions.Builder#setInterceptors(java.util.List)} and consulted once during + * client construction to build the chain of {@link NexusClientCallsInterceptor}s that wraps the + * root invoker. + */ +@Experimental +public interface NexusClientInterceptor { + + /** + * Called once during {@link NexusClient} construction to build the chain of per-call + * interceptors. + * + * @param next next per-call interceptor in the chain + * @return new per-call interceptor that decorates calls to {@code next} + */ + NexusClientCallsInterceptor nexusClientCallsInterceptor(NexusClientCallsInterceptor next); +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientInterceptorBase.java new file mode 100644 index 000000000..b964626fd --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientInterceptorBase.java @@ -0,0 +1,13 @@ +package io.temporal.common.interceptors; + +import io.temporal.common.Experimental; + +/** Convenience base class for {@link NexusClientInterceptor} implementations. */ +@Experimental +public class NexusClientInterceptorBase implements NexusClientInterceptor { + + @Override + public NexusClientCallsInterceptor nexusClientCallsInterceptor(NexusClientCallsInterceptor next) { + return next; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootNexusClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootNexusClientInvoker.java new file mode 100644 index 000000000..5814d8c65 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootNexusClientInvoker.java @@ -0,0 +1,228 @@ +package io.temporal.internal.client; + +import io.temporal.api.sdk.v1.UserMetadata; +import io.temporal.api.workflowservice.v1.CountNexusOperationExecutionsRequest; +import io.temporal.api.workflowservice.v1.CountNexusOperationExecutionsResponse; +import io.temporal.api.workflowservice.v1.DeleteNexusOperationExecutionRequest; +import io.temporal.api.workflowservice.v1.DescribeNexusOperationExecutionRequest; +import io.temporal.api.workflowservice.v1.DescribeNexusOperationExecutionResponse; +import io.temporal.api.workflowservice.v1.ListNexusOperationExecutionsRequest; +import io.temporal.api.workflowservice.v1.ListNexusOperationExecutionsResponse; +import io.temporal.api.workflowservice.v1.PollNexusOperationExecutionRequest; +import io.temporal.api.workflowservice.v1.PollNexusOperationExecutionResponse; +import io.temporal.api.workflowservice.v1.RequestCancelNexusOperationExecutionRequest; +import io.temporal.api.workflowservice.v1.StartNexusOperationExecutionRequest; +import io.temporal.api.workflowservice.v1.StartNexusOperationExecutionResponse; +import io.temporal.api.workflowservice.v1.TerminateNexusOperationExecutionRequest; +import io.temporal.client.NexusClientOperationExecutionDescription; +import io.temporal.client.NexusClientOperationOptions; +import io.temporal.common.Experimental; +import io.temporal.common.interceptors.NexusClientCallsInterceptor; +import io.temporal.internal.client.external.GenericWorkflowClient; +import io.temporal.internal.common.ProtobufTimeUtils; +import io.temporal.internal.common.WorkflowExecutionUtils; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * Root implementation of {@link NexusClientCallsInterceptor} that converts the SDK's Java DTOs into + * proto requests and delegates the actual gRPC calls to {@link GenericWorkflowClient}. + */ +@Experimental +public class RootNexusClientInvoker implements NexusClientCallsInterceptor { + + private final GenericWorkflowClient genericClient; + private final NexusClientOperationOptions clientOptions; + + public RootNexusClientInvoker( + GenericWorkflowClient genericClient, NexusClientOperationOptions clientOptions) { + this.genericClient = genericClient; + this.clientOptions = clientOptions; + } + + @Override + public StartNexusOperationExecutionOutput startNexusOperationExecution( + StartNexusOperationExecutionInput input) { + StartNexusOperationExecutionRequest.Builder request = + StartNexusOperationExecutionRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setRequestId(UUID.randomUUID().toString()) + .setOperationId(input.getOperationId()) + .setEndpoint(input.getEndpoint()) + .setService(input.getService()) + .setOperation(input.getOperation()) + .putAllNexusHeader(input.getNexusHeader()); + + input + .getScheduleToCloseTimeout() + .ifPresent(d -> request.setScheduleToCloseTimeout(ProtobufTimeUtils.toProtoDuration(d))); + input + .getScheduleToStartTimeout() + .ifPresent(d -> request.setScheduleToStartTimeout(ProtobufTimeUtils.toProtoDuration(d))); + input + .getStartToCloseTimeout() + .ifPresent(d -> request.setStartToCloseTimeout(ProtobufTimeUtils.toProtoDuration(d))); + input.getInput().ifPresent(request::setInput); + input.getSearchAttributes().ifPresent(request::setSearchAttributes); + input.getIdReusePolicy().ifPresent(request::setIdReusePolicy); + input.getIdConflictPolicy().ifPresent(request::setIdConflictPolicy); + input + .getSummary() + .ifPresent( + summary -> { + UserMetadata metadata = + WorkflowExecutionUtils.makeUserMetaData( + summary, /* details= */ null, clientOptions.getDataConverter()); + if (metadata != null) { + request.setUserMetadata(metadata); + } + }); + + StartNexusOperationExecutionResponse response = + genericClient.startNexusOperationExecution(request.build()); + return new StartNexusOperationExecutionOutput(response.getRunId(), response.getStarted()); + } + + @Override + public DescribeNexusOperationExecutionOutput describeNexusOperationExecution( + DescribeNexusOperationExecutionInput input) { + DescribeNexusOperationExecutionRequest request = buildDescribeRequest(input); + DescribeNexusOperationExecutionResponse response = + genericClient.describeNexusOperationExecution(request, input.getDeadline()); + return new DescribeNexusOperationExecutionOutput( + new NexusClientOperationExecutionDescription(response)); + } + + @Override + public CompletableFuture + describeNexusOperationExecutionAsync(DescribeNexusOperationExecutionInput input) { + DescribeNexusOperationExecutionRequest request = buildDescribeRequest(input); + return genericClient + .describeNexusOperationExecutionAsync(request, input.getDeadline()) + .thenApply( + response -> + new DescribeNexusOperationExecutionOutput( + new NexusClientOperationExecutionDescription(response))); + } + + private DescribeNexusOperationExecutionRequest buildDescribeRequest( + DescribeNexusOperationExecutionInput input) { + DescribeNexusOperationExecutionRequest.Builder request = + DescribeNexusOperationExecutionRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setOperationId(input.getOperationId()) + .setIncludeInput(input.isIncludeInput()) + .setIncludeOutcome(input.isIncludeOutcome()); + input.getRunId().ifPresent(request::setRunId); + return request.build(); + } + + @Override + public PollNexusOperationExecutionOutput pollNexusOperationExecution( + PollNexusOperationExecutionInput input) { + PollNexusOperationExecutionResponse response = + genericClient.pollNexusOperationExecution(buildPollRequest(input), input.getDeadline()); + return toPollOutput(response); + } + + @Override + public CompletableFuture pollNexusOperationExecutionAsync( + PollNexusOperationExecutionInput input) { + return genericClient + .pollNexusOperationExecutionAsync(buildPollRequest(input), input.getDeadline()) + .thenApply(this::toPollOutput); + } + + private PollNexusOperationExecutionRequest buildPollRequest( + PollNexusOperationExecutionInput input) { + PollNexusOperationExecutionRequest.Builder request = + PollNexusOperationExecutionRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setOperationId(input.getOperationId()) + .setWaitStage(input.getWaitStage()); + input.getRunId().ifPresent(request::setRunId); + return request.build(); + } + + private PollNexusOperationExecutionOutput toPollOutput( + PollNexusOperationExecutionResponse response) { + return new PollNexusOperationExecutionOutput( + response.getRunId(), + response.getWaitStage(), + response.getOperationToken(), + response.hasResult() ? response.getResult() : null, + response.hasFailure() ? response.getFailure() : null); + } + + @Override + public ListNexusOperationExecutionsOutput listNexusOperationExecutions( + ListNexusOperationExecutionsInput input) { + ListNexusOperationExecutionsRequest.Builder request = + ListNexusOperationExecutionsRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setPageSize(input.getPageSize()); + input.getQuery().ifPresent(request::setQuery); + input.getNextPageToken().ifPresent(request::setNextPageToken); + + ListNexusOperationExecutionsResponse response = + genericClient.listNexusOperationExecutions(request.build()); + return new ListNexusOperationExecutionsOutput( + response.getOperationsList(), response.getNextPageToken()); + } + + @Override + public CountNexusOperationExecutionsOutput countNexusOperationExecutions( + CountNexusOperationExecutionsInput input) { + CountNexusOperationExecutionsRequest.Builder request = + CountNexusOperationExecutionsRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()); + input.getQuery().ifPresent(request::setQuery); + + CountNexusOperationExecutionsResponse response = + genericClient.countNexusOperationExecutions(request.build()); + + java.util.List groups = + new java.util.ArrayList<>(response.getGroupsCount()); + for (CountNexusOperationExecutionsResponse.AggregationGroup g : response.getGroupsList()) { + groups.add( + new CountNexusOperationExecutionsOutput.AggregationGroup( + g.getGroupValuesList(), g.getCount())); + } + return new CountNexusOperationExecutionsOutput(response.getCount(), groups); + } + + @Override + public void requestCancelNexusOperationExecution( + RequestCancelNexusOperationExecutionInput input) { + RequestCancelNexusOperationExecutionRequest.Builder request = + RequestCancelNexusOperationExecutionRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setRequestId(UUID.randomUUID().toString()) + .setOperationId(input.getOperationId()); + input.getRunId().ifPresent(request::setRunId); + input.getReason().ifPresent(request::setReason); + genericClient.requestCancelNexusOperationExecution(request.build()); + } + + @Override + public void terminateNexusOperationExecution(TerminateNexusOperationExecutionInput input) { + TerminateNexusOperationExecutionRequest.Builder request = + TerminateNexusOperationExecutionRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setRequestId(UUID.randomUUID().toString()) + .setOperationId(input.getOperationId()); + input.getRunId().ifPresent(request::setRunId); + input.getReason().ifPresent(request::setReason); + genericClient.terminateNexusOperationExecution(request.build()); + } + + @Override + public void deleteNexusOperationExecution(DeleteNexusOperationExecutionInput input) { + DeleteNexusOperationExecutionRequest.Builder request = + DeleteNexusOperationExecutionRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setOperationId(input.getOperationId()); + input.getRunId().ifPresent(request::setRunId); + genericClient.deleteNexusOperationExecution(request.build()); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java index 1b7bf57c9..b74da549e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java @@ -61,6 +61,36 @@ CompletableFuture listWorkflowExecutionsAsync( DescribeWorkflowExecutionResponse describeWorkflowExecution( DescribeWorkflowExecutionRequest request); + StartNexusOperationExecutionResponse startNexusOperationExecution( + @Nonnull StartNexusOperationExecutionRequest request); + + DescribeNexusOperationExecutionResponse describeNexusOperationExecution( + @Nonnull DescribeNexusOperationExecutionRequest request, @Nonnull Deadline deadline); + + CompletableFuture describeNexusOperationExecutionAsync( + @Nonnull DescribeNexusOperationExecutionRequest request, @Nonnull Deadline deadline); + + PollNexusOperationExecutionResponse pollNexusOperationExecution( + @Nonnull PollNexusOperationExecutionRequest request, @Nonnull Deadline deadline); + + CompletableFuture pollNexusOperationExecutionAsync( + @Nonnull PollNexusOperationExecutionRequest request, @Nonnull Deadline deadline); + + ListNexusOperationExecutionsResponse listNexusOperationExecutions( + @Nonnull ListNexusOperationExecutionsRequest request); + + CountNexusOperationExecutionsResponse countNexusOperationExecutions( + @Nonnull CountNexusOperationExecutionsRequest request); + + RequestCancelNexusOperationExecutionResponse requestCancelNexusOperationExecution( + @Nonnull RequestCancelNexusOperationExecutionRequest request); + + TerminateNexusOperationExecutionResponse terminateNexusOperationExecution( + @Nonnull TerminateNexusOperationExecutionRequest request); + + DeleteNexusOperationExecutionResponse deleteNexusOperationExecution( + @Nonnull DeleteNexusOperationExecutionRequest request); + @Experimental @Deprecated UpdateWorkerBuildIdCompatibilityResponse updateWorkerBuildIdCompatability( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java index cd33a532a..b5f6f902b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java @@ -309,6 +309,141 @@ public DescribeWorkflowExecutionResponse describeWorkflowExecution( grpcRetryerOptions); } + // TODO -- EVAN -- START + @Override + public StartNexusOperationExecutionResponse startNexusOperationExecution( + @Nonnull StartNexusOperationExecutionRequest request) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .startNexusOperationExecution(request), + grpcRetryerOptions); + } + + @Override + public DescribeNexusOperationExecutionResponse describeNexusOperationExecution( + @Nonnull DescribeNexusOperationExecutionRequest request, @Nonnull Deadline deadline) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .withOption(HISTORY_LONG_POLL_CALL_OPTIONS_KEY, true) + .withDeadline(deadline) + .describeNexusOperationExecution(request), + new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, deadline)); + } + + @Override + public CompletableFuture + describeNexusOperationExecutionAsync( + @Nonnull DescribeNexusOperationExecutionRequest request, @Nonnull Deadline deadline) { + return grpcRetryer.retryWithResultAsync( + asyncThrottlerExecutor, + () -> + toCompletableFuture( + service + .futureStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .withOption(HISTORY_LONG_POLL_CALL_OPTIONS_KEY, true) + .withDeadline(deadline) + .describeNexusOperationExecution(request)), + new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, deadline)); + } + + @Override + public PollNexusOperationExecutionResponse pollNexusOperationExecution( + @Nonnull PollNexusOperationExecutionRequest request, @Nonnull Deadline deadline) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .withOption(HISTORY_LONG_POLL_CALL_OPTIONS_KEY, true) + .withDeadline(deadline) + .pollNexusOperationExecution(request), + new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, deadline)); + } + + @Override + public CompletableFuture pollNexusOperationExecutionAsync( + @Nonnull PollNexusOperationExecutionRequest request, @Nonnull Deadline deadline) { + return grpcRetryer.retryWithResultAsync( + asyncThrottlerExecutor, + () -> + toCompletableFuture( + service + .futureStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .withOption(HISTORY_LONG_POLL_CALL_OPTIONS_KEY, true) + .withDeadline(deadline) + .pollNexusOperationExecution(request)), + new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, deadline)); + } + + @Override + public ListNexusOperationExecutionsResponse listNexusOperationExecutions( + @Nonnull ListNexusOperationExecutionsRequest request) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .listNexusOperationExecutions(request), + grpcRetryerOptions); + } + + @Override + public CountNexusOperationExecutionsResponse countNexusOperationExecutions( + @Nonnull CountNexusOperationExecutionsRequest request) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .countNexusOperationExecutions(request), + grpcRetryerOptions); + } + + @Override + public RequestCancelNexusOperationExecutionResponse requestCancelNexusOperationExecution( + @Nonnull RequestCancelNexusOperationExecutionRequest request) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .requestCancelNexusOperationExecution(request), + grpcRetryerOptions); + } + + @Override + public TerminateNexusOperationExecutionResponse terminateNexusOperationExecution( + @Nonnull TerminateNexusOperationExecutionRequest request) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .terminateNexusOperationExecution(request), + grpcRetryerOptions); + } + + @Override + public DeleteNexusOperationExecutionResponse deleteNexusOperationExecution( + @Nonnull DeleteNexusOperationExecutionRequest request) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .deleteNexusOperationExecution(request), + grpcRetryerOptions); + } + + // TODO -- EVAN -- END private static CompletableFuture toCompletableFuture( ListenableFuture listenableFuture) { CompletableFuture result = new CompletableFuture<>(); diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationOptions.java b/temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationOptions.java index 0952f6853..3d3067dd0 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationOptions.java @@ -5,7 +5,7 @@ import java.util.Objects; /** - * NexusOperationOptions is used to specify the options for starting a Nexus operation from a + * NexusClientOperationOptions is used to specify the options for starting a Nexus operation from a * Workflow. * *

Use {@link NexusOperationOptions#newBuilder()} to construct an instance. @@ -228,7 +228,7 @@ public int hashCode() { @Override public String toString() { - return "NexusOperationOptions{" + return "NexusClientOperationOptions{" + "scheduleToCloseTimeout=" + scheduleToCloseTimeout + ", scheduleToStartTimeout=" diff --git a/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientHandleTest.java b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientHandleTest.java new file mode 100644 index 000000000..1c1623b3b --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientHandleTest.java @@ -0,0 +1,375 @@ +package io.temporal.client.nexus; + +import com.google.protobuf.ByteString; +import io.nexusrpc.OperationException; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.common.v1.Payload; +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.api.nexus.v1.EndpointSpec; +import io.temporal.api.nexus.v1.EndpointTarget; +import io.temporal.api.operatorservice.v1.CreateNexusEndpointRequest; +import io.temporal.api.operatorservice.v1.CreateNexusEndpointResponse; +import io.temporal.api.operatorservice.v1.DeleteNexusEndpointRequest; +import io.temporal.client.NexusClient; +import io.temporal.client.NexusClientImpl; +import io.temporal.client.NexusClientOperationExecutionDescription; +import io.temporal.client.NexusClientOperationOptions; +import io.temporal.client.UntypedNexusClientHandle; +import io.temporal.common.interceptors.NexusClientCallsInterceptor; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestNexusServices; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import java.util.UUID; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for {@link UntypedNexusClientHandle} per-execution lifecycle methods returned by {@link + * NexusClient#getHandle(String)}: {@code describe()}, {@code cancel()}/{@code cancel(reason)}, and + * {@code terminate()}/{@code terminate(reason)}. + */ +public class NexusClientHandleTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(PlaceholderWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + // Default is 10s; standalone Nexus dispatch + worker poll can take longer. + .setTestTimeoutSeconds(120) + .build(); + + private NexusClient createNexusClient() { + return NexusClientImpl.newInstance( + testWorkflowRule.getWorkflowServiceStubs(), + NexusClientOperationOptions.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .build()); + } + + @Test + public void describeReturnsDescriptionForStartedOperation() { + StartedOperation started = startOperation(); + try { + UntypedNexusClientHandle handle = + started.client.getHandle(started.operationId, started.startOutput.getRunId()); + + NexusClientOperationExecutionDescription description = handle.describe(); + + Assert.assertNotNull(description); + Assert.assertNotNull(description.getRunId()); + Assert.assertEquals(started.startOutput.getRunId(), description.getRunId()); + Assert.assertNotNull(description.getRawResponse()); + } finally { + cleanup(started); + } + } + + @Test + public void describeWithoutRunIdTargetsLatest() { + StartedOperation started = startOperation(); + try { + // Handle with no pinned run ID — server should resolve to the latest run. + UntypedNexusClientHandle handle = started.client.getHandle(started.operationId); + + NexusClientOperationExecutionDescription description = handle.describe(); + + Assert.assertNotNull(description); + Assert.assertEquals(started.startOutput.getRunId(), description.getRunId()); + } finally { + cleanup(started); + } + } + + @Test + public void cancelSucceedsForStartedOperation() { + StartedOperation started = startOperation(); + try { + UntypedNexusClientHandle handle = + started.client.getHandle(started.operationId, started.startOutput.getRunId()); + + handle.cancel(); + // No exception — server accepted the cancel request. + } finally { + cleanup(started); + } + } + + @Test + public void cancelWithReasonSucceedsForStartedOperation() { + StartedOperation started = startOperation(); + try { + UntypedNexusClientHandle handle = + started.client.getHandle(started.operationId, started.startOutput.getRunId()); + + handle.cancel("test-cancel-reason"); + } finally { + cleanup(started); + } + } + + @Test + public void cancelWithNullReasonSucceeds() { + StartedOperation started = startOperation(); + try { + UntypedNexusClientHandle handle = + started.client.getHandle(started.operationId, started.startOutput.getRunId()); + + handle.cancel(null); + } finally { + cleanup(started); + } + } + + @Test + public void terminateSucceedsForStartedOperation() { + StartedOperation started = startOperation(); + try { + UntypedNexusClientHandle handle = + started.client.getHandle(started.operationId, started.startOutput.getRunId()); + + handle.terminate(); + } finally { + cleanup(started); + } + } + + @Test + public void terminateWithReasonSucceedsForStartedOperation() { + StartedOperation started = startOperation(); + try { + UntypedNexusClientHandle handle = + started.client.getHandle(started.operationId, started.startOutput.getRunId()); + + handle.terminate("test-terminate-reason"); + } finally { + cleanup(started); + } + } + + @Test + public void terminateWithNullReasonSucceeds() { + StartedOperation started = startOperation(); + try { + UntypedNexusClientHandle handle = + started.client.getHandle(started.operationId, started.startOutput.getRunId()); + + handle.terminate(null); + } finally { + cleanup(started); + } + } + + @Test + public void getResultReturnsTypedResultForSyncOperation() { + StartedOperation started = startOperation(); + try { + UntypedNexusClientHandle untyped = + started.client.getHandle(started.operationId, started.startOutput.getRunId()); + + String result = + io.temporal.client.NexusClientHandle.fromUntyped(untyped, String.class).getResult(); + + Assert.assertNotNull(result); + Assert.assertTrue("expected echo: prefix, got: " + result, result.startsWith("echo:ping-")); + } finally { + cleanup(started); + } + } + + @Test + public void getResultUntypedReturnsResultForSyncOperation() { + StartedOperation started = startOperation(); + try { + UntypedNexusClientHandle handle = + started.client.getHandle(started.operationId, started.startOutput.getRunId()); + + String result = handle.getResult(String.class); + + Assert.assertNotNull(result); + Assert.assertTrue(result.startsWith("echo:ping-")); + } finally { + cleanup(started); + } + } + + @Test + public void getResultAsyncReturnsTypedResultForSyncOperation() throws Exception { + StartedOperation started = startOperation(); + try { + UntypedNexusClientHandle untyped = + started.client.getHandle(started.operationId, started.startOutput.getRunId()); + + String result = + io.temporal.client.NexusClientHandle.fromUntyped(untyped, String.class) + .getResultAsync() + .get(60, java.util.concurrent.TimeUnit.SECONDS); + + Assert.assertNotNull(result); + Assert.assertTrue(result.startsWith("echo:ping-")); + } finally { + cleanup(started); + } + } + + /** Holder for state used to drive a single test against one started operation. */ + private static final class StartedOperation { + final NexusClient client; + final Endpoint endpoint; + final String operationId; + final NexusClientCallsInterceptor.StartNexusOperationExecutionOutput startOutput; + + StartedOperation( + NexusClient client, + Endpoint endpoint, + String operationId, + NexusClientCallsInterceptor.StartNexusOperationExecutionOutput startOutput) { + this.client = client; + this.endpoint = endpoint; + this.operationId = operationId; + this.startOutput = startOutput; + } + } + + private StartedOperation startOperation() { + return startOperation(null); + } + + private StartedOperation startOperation(@javax.annotation.Nullable String inputOverride) { + NexusClient client = createNexusClient(); + Endpoint endpoint = createEndpoint("test-endpoint-" + testWorkflowRule.getTaskQueue()); + String operationId = "nexus-handle-test-" + UUID.randomUUID(); + String inputValue = inputOverride != null ? inputOverride : "ping-" + operationId; + + Payload inputPayload = + testWorkflowRule + .getWorkflowClient() + .getOptions() + .getDataConverter() + .toPayload(inputValue) + .orElseThrow(() -> new AssertionError("DataConverter returned no payload")); + + NexusClientCallsInterceptor.StartNexusOperationExecutionOutput startOutput = + client.startNexusOperationExecution( + new NexusClientCallsInterceptor.StartNexusOperationExecutionInput( + operationId, + endpoint.getSpec().getName(), + TestNexusServices.TestNexusService1.class.getSimpleName(), + "operation", + Duration.ofSeconds(30), + inputPayload, + /* searchAttributes= */ null, + /* nexusHeader= */ null)); + + Assert.assertNotNull("expected start to return a run ID", startOutput.getRunId()); + return new StartedOperation(client, endpoint, operationId, startOutput); + } + + private void cleanup(StartedOperation started) { + // Best-effort: server may reject delete depending on operation state. + try { + started.client.getHandle(started.operationId, started.startOutput.getRunId()).delete(); + } catch (RuntimeException ignored) { + // ignored + } + deleteEndpoint(started.endpoint); + } + + private Endpoint createEndpoint(String name) { + EndpointSpec spec = + EndpointSpec.newBuilder() + .setName(name) + .setDescription( + Payload.newBuilder().setData(ByteString.copyFromUtf8("test endpoint")).build()) + .setTarget( + EndpointTarget.newBuilder() + .setWorker( + EndpointTarget.Worker.newBuilder() + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setTaskQueue(testWorkflowRule.getTaskQueue()))) + .build(); + CreateNexusEndpointResponse resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .createNexusEndpoint(CreateNexusEndpointRequest.newBuilder().setSpec(spec).build()); + return resp.getEndpoint(); + } + + private void deleteEndpoint(Endpoint endpoint) { + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .deleteNexusEndpoint( + DeleteNexusEndpointRequest.newBuilder() + .setId(endpoint.getId()) + .setVersion(endpoint.getVersion()) + .build()); + } + + public static class PlaceholderWorkflowImpl implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + return input; + } + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public static class TestNexusServiceImpl { + /** Inputs starting with this prefix make the handler throw, exercising the failure path. */ + static final String FAIL_PREFIX = "FAIL:"; + + @OperationImpl + public OperationHandler operation() { + return OperationHandler.sync( + (context, details, input) -> { + if (input != null && input.startsWith(FAIL_PREFIX)) { + // OperationException.failed = definitive failure (no retries) so the caller's + // getResult surfaces the failure instead of timing out. + throw OperationException.failed("intentional failure: " + input); + } + return "echo:" + (input == null ? "" : input); + }); + } + } + + @Test + public void getResultPropagatesOperationFailure() { + StartedOperation started = startOperation(TestNexusServiceImpl.FAIL_PREFIX + "boom"); + try { + UntypedNexusClientHandle handle = + started.client.getHandle(started.operationId, started.startOutput.getRunId()); + + try { + handle.getResult(String.class); + Assert.fail("expected getResult to throw because the operation handler failed"); + } catch (RuntimeException e) { + // The DataConverter wraps the proto Failure into a Java exception. Either the message + // carries the handler's reason, or one of the cause links does. + String combined = collectMessages(e); + Assert.assertTrue( + "expected exception chain to mention the handler failure, got: " + combined, + combined.contains("intentional failure")); + } + } finally { + cleanup(started); + } + } + + private static String collectMessages(Throwable t) { + StringBuilder sb = new StringBuilder(); + for (Throwable c = t; c != null; c = c.getCause()) { + sb.append(c.getClass().getSimpleName()).append(":").append(c.getMessage()).append(" | "); + if (c.getCause() == c) { + break; + } + } + return sb.toString(); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientInterceptorChainTest.java b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientInterceptorChainTest.java new file mode 100644 index 000000000..39380a90c --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientInterceptorChainTest.java @@ -0,0 +1,108 @@ +package io.temporal.client.nexus; + +import io.temporal.client.NexusClient; +import io.temporal.client.NexusClientImpl; +import io.temporal.client.NexusClientOperationOptions; +import io.temporal.common.interceptors.NexusClientCallsInterceptor; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptorBase; +import io.temporal.common.interceptors.NexusClientInterceptor; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestWorkflows; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +/** + * Verifies that user-registered {@link NexusClientInterceptor}s are wrapped around the root invoker + * in registration order (last registered = outermost), and that every per-call operation passes + * through every interceptor. + */ +public class NexusClientInterceptorChainTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(PlaceholderWorkflowImpl.class) + .setTestTimeoutSeconds(60) + .build(); + + @Test + public void registeredInterceptorsAreCalledInOrder() { + List calls = Collections.synchronizedList(new ArrayList<>()); + NexusClientInterceptor first = next -> new RecordingCallsInterceptor("first", next, calls); + NexusClientInterceptor second = next -> new RecordingCallsInterceptor("second", next, calls); + + NexusClient client = + NexusClientImpl.newInstance( + testWorkflowRule.getWorkflowServiceStubs(), + NexusClientOperationOptions.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setInterceptors(Arrays.asList(first, second)) + .build()); + + client.listNexusOperationExecutions(new ListNexusOperationExecutionsInput(null, 100, null)); + client.countNexusOperationExecutions(new CountNexusOperationExecutionsInput(null)); + + // [first, second] -> second wraps first wraps root. + // A call enters second, descends to first, then root, returns through first then second. + Assert.assertEquals( + Arrays.asList( + "second:list:before", + "first:list:before", + "first:list:after", + "second:list:after", + "second:count:before", + "first:count:before", + "first:count:after", + "second:count:after"), + calls); + } + + static class RecordingCallsInterceptor extends NexusClientCallsInterceptorBase { + private final String name; + private final List calls; + + RecordingCallsInterceptor(String name, NexusClientCallsInterceptor next, List calls) { + super(next); + this.name = name; + this.calls = calls; + } + + @Override + public ListNexusOperationExecutionsOutput listNexusOperationExecutions( + ListNexusOperationExecutionsInput input) { + calls.add(name + ":list:before"); + try { + return super.listNexusOperationExecutions(input); + } finally { + calls.add(name + ":list:after"); + } + } + + @Override + public CountNexusOperationExecutionsOutput countNexusOperationExecutions( + CountNexusOperationExecutionsInput input) { + calls.add(name + ":count:before"); + try { + return super.countNexusOperationExecutions(input); + } finally { + calls.add(name + ":count:after"); + } + } + } + + public static class PlaceholderWorkflowImpl implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + return input; + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientTest.java b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientTest.java new file mode 100644 index 000000000..9f97c4761 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientTest.java @@ -0,0 +1,238 @@ +package io.temporal.client.nexus; + +import com.google.protobuf.ByteString; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.common.v1.Payload; +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.api.nexus.v1.EndpointSpec; +import io.temporal.api.nexus.v1.EndpointTarget; +import io.temporal.api.nexus.v1.NexusOperationExecutionListInfo; +import io.temporal.api.operatorservice.v1.CreateNexusEndpointRequest; +import io.temporal.api.operatorservice.v1.CreateNexusEndpointResponse; +import io.temporal.api.operatorservice.v1.DeleteNexusEndpointRequest; +import io.temporal.client.NexusClient; +import io.temporal.client.NexusClientImpl; +import io.temporal.client.NexusClientOperationOptions; +import io.temporal.common.interceptors.NexusClientCallsInterceptor; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestNexusServices; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class NexusClientTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(NexusClientTest.PlaceholderWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + // Default is 10s; standalone Nexus dispatch + worker poll can take longer. + .setTestTimeoutSeconds(120) + .build(); + + private NexusClient createNexusClient() { + return NexusClientImpl.newInstance( + testWorkflowRule.getWorkflowServiceStubs(), + NexusClientOperationOptions.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .build()); + } + + @Test + public void listNexusOperationExecutions() { + NexusClient client = createNexusClient(); + NexusClientCallsInterceptor.ListNexusOperationExecutionsInput input = + new NexusClientCallsInterceptor.ListNexusOperationExecutionsInput(null, 100, null); + + NexusClientCallsInterceptor.ListNexusOperationExecutionsOutput output = + client.listNexusOperationExecutions(input); + + Assert.assertNotNull(output); + Assert.assertNotNull(output.getOperations()); + Assert.assertNotNull(output.getNextPageToken()); + } + + @Test + public void countNexusOperationExecutions() { + // Just run a basic test to see if it works + countNexusOperations(); + } + + public long countNexusOperations() { + NexusClient client = createNexusClient(); + NexusClientCallsInterceptor.CountNexusOperationExecutionsInput input = + new NexusClientCallsInterceptor.CountNexusOperationExecutionsInput(null); + + NexusClientCallsInterceptor.CountNexusOperationExecutionsOutput output = + client.countNexusOperationExecutions(input); + + Assert.assertNotNull(output); + Assert.assertTrue(output.getCount() >= 0); + Assert.assertNotNull(output.getGroups()); + + return output.getCount(); + } + + @Test + public void runStandaloneNexusOperation() throws Exception { + TestNexusServiceImpl.received = new java.util.concurrent.CompletableFuture<>(); + TestNexusServiceImpl.invocationCount.set(0); + + Endpoint endpoint = createEndpoint("test-endpoint-" + testWorkflowRule.getTaskQueue()); + String operationId = "nexus-test-" + UUID.randomUUID(); + String inputValue = "ping-" + operationId; + NexusClient client = createNexusClient(); + + try { + Payload inputPayload = + testWorkflowRule + .getWorkflowClient() + .getOptions() + .getDataConverter() + .toPayload(inputValue) + .orElseThrow(() -> new AssertionError("DataConverter returned no payload")); + + NexusClientCallsInterceptor.StartNexusOperationExecutionOutput startOutput = + client.startNexusOperationExecution( + new NexusClientCallsInterceptor.StartNexusOperationExecutionInput( + operationId, + endpoint.getSpec().getName(), + TestNexusServices.TestNexusService1.class.getSimpleName(), + "operation", + Duration.ofSeconds(30), + inputPayload, + /* searchAttributes= */ null, + /* nexusHeader= */ null)); + + // Sync handler: wait for the input to land in the test side-channel; that's how we + // know the operation actually completed on the worker. + String observed; + try { + observed = TestNexusServiceImpl.received.get(60, TimeUnit.SECONDS); + } catch (java.util.concurrent.TimeoutException e) { + Assert.fail( + "Nexus handler was never invoked within 60s. invocationCount=" + + TestNexusServiceImpl.invocationCount.get()); + throw new AssertionError("unreachable"); + } + Assert.assertEquals( + "expected the Nexus handler to receive the same input we sent", inputValue, observed); + + // Poll the list until our operationId appears. This also tests that the list operation + // works correctly. + NexusOperationExecutionListInfo listed = + waitForListedOperation(client, operationId, Duration.ofSeconds(15)); + Assert.assertNotNull( + "expected operationId " + operationId + " to appear in listNexusOperationExecutions", + listed); + Assert.assertEquals(operationId, listed.getOperationId()); + Assert.assertEquals(endpoint.getSpec().getName(), listed.getEndpoint()); + Assert.assertEquals( + TestNexusServices.TestNexusService1.class.getSimpleName(), listed.getService()); + Assert.assertEquals("operation", listed.getOperation()); + + // We know count should be at least 1 until we clean up + // Due to race conditions with other tests running, we don't know what it actually should be + // though - + // but this is a chance to assert that it at least returns a non-zero value when appropriate + Assert.assertTrue(countNexusOperations() >= 1); + + // Best-effort cleanup of the operation execution itself. + try { + client.getHandle(operationId, startOutput.getRunId()).delete(); + } catch (RuntimeException ignored) { + // Server may reject delete depending on operation state. + } + } finally { + deleteEndpoint(endpoint); + } + } + + private NexusOperationExecutionListInfo waitForListedOperation( + NexusClient client, String operationId, Duration timeout) throws InterruptedException { + long deadlineNanos = System.nanoTime() + timeout.toNanos(); + NexusClientCallsInterceptor.ListNexusOperationExecutionsInput listInput = + new NexusClientCallsInterceptor.ListNexusOperationExecutionsInput(null, 100, null); + while (System.nanoTime() < deadlineNanos) { + NexusClientCallsInterceptor.ListNexusOperationExecutionsOutput out = + client.listNexusOperationExecutions(listInput); + for (NexusOperationExecutionListInfo info : out.getOperations()) { + if (operationId.equals(info.getOperationId())) { + return info; + } + } + Thread.sleep(500); + } + return null; + } + + private Endpoint createEndpoint(String name) { + EndpointSpec spec = + EndpointSpec.newBuilder() + .setName(name) + .setDescription( + Payload.newBuilder().setData(ByteString.copyFromUtf8("test endpoint")).build()) + .setTarget( + EndpointTarget.newBuilder() + .setWorker( + EndpointTarget.Worker.newBuilder() + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setTaskQueue(testWorkflowRule.getTaskQueue()))) + .build(); + CreateNexusEndpointResponse resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .createNexusEndpoint(CreateNexusEndpointRequest.newBuilder().setSpec(spec).build()); + return resp.getEndpoint(); + } + + private void deleteEndpoint(Endpoint endpoint) { + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .deleteNexusEndpoint( + DeleteNexusEndpointRequest.newBuilder() + .setId(endpoint.getId()) + .setVersion(endpoint.getVersion()) + .build()); + } + + public static class PlaceholderWorkflowImpl implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + return input; + } + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public static class TestNexusServiceImpl { + // CompletableFuture (not BlockingQueue) so we can record a null input — the worker may + // legitimately deliver a null payload, and we want a clean assertion failure instead of a + // NullPointerException-driven retry storm. Reassigned per test in a @Before-style reset. + static volatile java.util.concurrent.CompletableFuture received = + new java.util.concurrent.CompletableFuture<>(); + static final java.util.concurrent.atomic.AtomicInteger invocationCount = + new java.util.concurrent.atomic.AtomicInteger(); + + @OperationImpl + public OperationHandler operation() { + return OperationHandler.sync( + (context, details, input) -> { + invocationCount.incrementAndGet(); + // complete() ignores subsequent calls, so the first delivered input wins. + received.complete(input); + return "echo:" + (input == null ? "" : input); + }); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusServiceClientTest.java b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusServiceClientTest.java new file mode 100644 index 000000000..e7fcdfb51 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusServiceClientTest.java @@ -0,0 +1,267 @@ +package io.temporal.client.nexus; + +import com.google.protobuf.ByteString; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.common.v1.Payload; +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.api.nexus.v1.EndpointSpec; +import io.temporal.api.nexus.v1.EndpointTarget; +import io.temporal.api.operatorservice.v1.CreateNexusEndpointRequest; +import io.temporal.api.operatorservice.v1.CreateNexusEndpointResponse; +import io.temporal.api.operatorservice.v1.DeleteNexusEndpointRequest; +import io.temporal.client.NexusClientHandle; +import io.temporal.client.NexusClientOperationOptions; +import io.temporal.client.NexusServiceClient; +import io.temporal.common.SearchAttributeKey; +import io.temporal.common.SearchAttributes; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.StartNexusOperationExecutionInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.StartNexusOperationExecutionOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptorBase; +import io.temporal.common.interceptors.NexusClientInterceptor; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestNexusServices; +import io.temporal.workflow.shared.TestWorkflows; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +/** + * End-to-end tests for {@link NexusServiceClient}: typed start/execute via {@link + * java.util.function.BiFunction} method references, and the auto-scoped {@code list}/{@code count} + * inherited from {@link io.temporal.client.UntypedNexusServiceClient}. + */ +public class NexusServiceClientTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(PlaceholderWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .setTestTimeoutSeconds(120) + .build(); + + @Test + public void executeReturnsTypedResult() { + Endpoint endpoint = createEndpoint("svc-execute-" + testWorkflowRule.getTaskQueue()); + try { + NexusServiceClient client = buildServiceClient(endpoint); + + String result = client.execute(TestNexusServices.TestNexusService1::operation, "hello"); + + Assert.assertEquals("echo:hello", result); + } finally { + deleteEndpoint(endpoint); + } + } + + @Test + public void startReturnsTypedHandleAndPollsResult() { + Endpoint endpoint = createEndpoint("svc-start-" + testWorkflowRule.getTaskQueue()); + try { + NexusServiceClient client = buildServiceClient(endpoint); + + NexusClientHandle handle = + client.start(TestNexusServices.TestNexusService1::operation, "world"); + + Assert.assertNotNull(handle.getNexusOperationId()); + Assert.assertEquals("echo:world", handle.getResult()); + } finally { + deleteEndpoint(endpoint); + } + } + + @Test + public void listAndCountAreScopedToService() throws Exception { + Endpoint endpoint = createEndpoint("svc-scoped-" + testWorkflowRule.getTaskQueue()); + try { + NexusServiceClient client = buildServiceClient(endpoint); + + // Start at least one operation so the service has a nonzero count. + String executed = + client.execute(TestNexusServices.TestNexusService1::operation, "scoped-list-test"); + Assert.assertEquals("echo:scoped-list-test", executed); + + // Untyped count: scoped to TestNexusService1 by the impl. Don't assert exactness — other + // tests may share the namespace — only that it's non-negative and the response shape is + // intact. (After the executed op, count >= 1.) + CountNexusOperationExecutionsOutput count = + client.countNexusOperationExecutions(new CountNexusOperationExecutionsInput(null)); + Assert.assertNotNull(count); + Assert.assertTrue(count.getCount() >= 1); + + // Untyped list: scoped to TestNexusService1. All returned entries should be for that + // service (not, e.g., TestNexusService2 if other tests are running). + ListNexusOperationExecutionsOutput list = + client.listNexusOperationExecutions( + new ListNexusOperationExecutionsInput(null, 100, null)); + Assert.assertNotNull(list); + Assert.assertNotNull(list.getOperations()); + String expectedService = TestNexusServices.TestNexusService1.class.getSimpleName(); + list.getOperations() + .forEach( + info -> + Assert.assertEquals( + "list result not scoped to expected service", + expectedService, + info.getService())); + } finally { + deleteEndpoint(endpoint); + } + } + + @Test + public void clientSummaryIsForwardedIntoStartInput() { + AtomicReference captured = new AtomicReference<>(); + RuntimeException sentinel = new RuntimeException("captured-by-test"); + + NexusClientInterceptor recordingFactory = + next -> + new NexusClientCallsInterceptorBase(next) { + @Override + public StartNexusOperationExecutionOutput startNexusOperationExecution( + StartNexusOperationExecutionInput input) { + captured.set(input); + throw sentinel; + } + }; + + NexusServiceClient client = + NexusServiceClient.newInstance( + TestNexusServices.TestNexusService1.class, + "summary-test-endpoint", + testWorkflowRule.getWorkflowServiceStubs(), + NexusClientOperationOptions.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setSummary("client-default-summary") + .setInterceptors(Collections.singletonList(recordingFactory)) + .build()); + + try { + client.start(TestNexusServices.TestNexusService1::operation, "ignored"); + Assert.fail("expected sentinel to be thrown by recording interceptor"); + } catch (RuntimeException e) { + Assert.assertSame(sentinel, e); + } + + StartNexusOperationExecutionInput input = captured.get(); + Assert.assertNotNull("interceptor should have captured a start input", input); + Assert.assertTrue( + "expected summary to be present on the start input", input.getSummary().isPresent()); + Assert.assertEquals("client-default-summary", input.getSummary().get()); + } + + @Test + public void clientSearchAttributesAreEncodedIntoStartInput() { + SearchAttributeKey customKey = SearchAttributeKey.forKeyword("CustomNexusTestKey"); + SearchAttributes attrs = SearchAttributes.newBuilder().set(customKey, "expected-value").build(); + + AtomicReference captured = new AtomicReference<>(); + RuntimeException sentinel = new RuntimeException("captured-by-test"); + + NexusClientInterceptor recordingFactory = + next -> + new NexusClientCallsInterceptorBase(next) { + @Override + public StartNexusOperationExecutionOutput startNexusOperationExecution( + StartNexusOperationExecutionInput input) { + captured.set(input); + throw sentinel; + } + }; + + NexusServiceClient client = + NexusServiceClient.newInstance( + TestNexusServices.TestNexusService1.class, + "search-attrs-test-endpoint", + testWorkflowRule.getWorkflowServiceStubs(), + NexusClientOperationOptions.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setSearchAttributes(attrs) + .setInterceptors(Collections.singletonList(recordingFactory)) + .build()); + + try { + client.start(TestNexusServices.TestNexusService1::operation, "ignored"); + Assert.fail("expected sentinel to be thrown by recording interceptor"); + } catch (RuntimeException e) { + Assert.assertSame(sentinel, e); + } + + StartNexusOperationExecutionInput input = captured.get(); + Assert.assertNotNull("interceptor should have captured a start input", input); + Assert.assertTrue( + "expected proto search attributes to be present", input.getSearchAttributes().isPresent()); + Assert.assertTrue( + "expected the custom keyword to be present in encoded search attributes", + input.getSearchAttributes().get().containsIndexedFields("CustomNexusTestKey")); + } + + private NexusServiceClient buildServiceClient( + Endpoint endpoint) { + return NexusServiceClient.newInstance( + TestNexusServices.TestNexusService1.class, + endpoint.getSpec().getName(), + testWorkflowRule.getWorkflowServiceStubs(), + io.temporal.client.NexusClientOperationOptions.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .build()); + } + + private Endpoint createEndpoint(String name) { + EndpointSpec spec = + EndpointSpec.newBuilder() + .setName(name) + .setDescription( + Payload.newBuilder().setData(ByteString.copyFromUtf8("test endpoint")).build()) + .setTarget( + EndpointTarget.newBuilder() + .setWorker( + EndpointTarget.Worker.newBuilder() + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setTaskQueue(testWorkflowRule.getTaskQueue()))) + .build(); + CreateNexusEndpointResponse resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .createNexusEndpoint(CreateNexusEndpointRequest.newBuilder().setSpec(spec).build()); + return resp.getEndpoint(); + } + + private void deleteEndpoint(Endpoint endpoint) { + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .deleteNexusEndpoint( + DeleteNexusEndpointRequest.newBuilder() + .setId(endpoint.getId()) + .setVersion(endpoint.getVersion()) + .build()); + } + + public static class PlaceholderWorkflowImpl implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + return input; + } + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public static class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return OperationHandler.sync( + (context, details, input) -> "echo:" + (input == null ? "" : input)); + } + } +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index a1cf4e111..ba45f5251 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -621,7 +621,7 @@ public void completeWorkflowTask( public void applyOnConflictOptions(@Nonnull StartWorkflowExecutionRequest request) { update( ctx -> { - OnConflictOptions options = request.getOnConflictOptions(); + io.temporal.api.workflow.v1.OnConflictOptions options = request.getOnConflictOptions(); String requestId = null; List completionCallbacks = null; List links = null;