Implement SignalWithStart as a system nexus endpoint#9833
Conversation
bergundy
left a comment
There was a problem hiding this comment.
Overall LGTM. Make sure to unit test the validator and I would revert the changes to the error strings that I made in my prototype. It's best not to change the existing experience for all of the common workflow APIs.
| s.Nil(failure) | ||
| s.True(resp.Started, "expected Started=true when starting a new workflow") | ||
| s.NotEmpty(resp.RunId) | ||
| defer func() { |
There was a problem hiding this comment.
Doesn't make to put the defer at the bottom here.
| } | ||
|
|
||
| // SignalWithStartWorkflowExecution implements the SignalWithStartWorkflowExecution Nexus operation. | ||
| func (h *workflowServiceNexusHandler) SignalWithStartWorkflowExecution(name string) nexus.Operation[*workflowservice.SignalWithStartWorkflowExecutionRequest, *workflowservice.SignalWithStartWorkflowExecutionResponse] { |
There was a problem hiding this comment.
No need for this function, change this to register below avoid the closure:
nexus.NewSyncOperation(workflowservicenexus.WorkflowService.SignalWithStartWorkflowExecution.Name(), h.signalWithStartWorkflowExecution)
… how nexus op is registered
bergundy
left a comment
There was a problem hiding this comment.
Approved with some small comments.
| // HistoryHandlerModule wires the workflow library's Nexus handler to the | ||
| // history service. Only include this in services that provide | ||
| // historyservice.HistoryServiceServer (the history service). | ||
| var HistoryHandlerModule = fx.Invoke(func(library *library, historyHandler historyservice.HistoryServiceServer) { |
There was a problem hiding this comment.
nit: should this be an fx.Module?
| "go.temporal.io/server/common/searchattribute" | ||
| ) | ||
|
|
||
| var ErrSystemNexusOperationsDisabled = serviceerror.NewUnimplemented("System Nexus operations are disabled") |
There was a problem hiding this comment.
Make this specific to the signal with start operation. We will want to enable each operation separately same way that we do for gRPC methods.
System nexus will already error out when a handler for the given operation isn't registered during command processing (or task execution, not sure exactly which one).
| if err != nil { | ||
| return nil, err | ||
| } | ||
| link := commonnexus.ConvertLinkWorkflowEventToNexusLink(&commonpb.Link_WorkflowEvent{ |
There was a problem hiding this comment.
The backlink should be generated in the history handler so it can ensure that the request ID is properly mapped to an event ID in order for the UI to properly follow the link (the mapping is available as part of the DescribeWorkflowExecution response for other events).
I missed this in the initial review, that's something I haven't completed when I handed over the work to you. I don't want to block the PR for this because it's a non-trivial amount of work that the Nexus team was going to tackle anyways.
| if err != nil { | ||
| var notFoundErr *serviceerror.NotFound | ||
| var invalidArgumentErr *serviceerror.InvalidArgument | ||
| if errors.As(err, ¬FoundErr) || errors.As(err, &invalidArgumentErr) { |
There was a problem hiding this comment.
nit: Use the more modern errors.AsType.
| currentWorkflowLease.GetMutableState().IsWorkflowExecutionRunning() && | ||
| signalWithStartRequest.WorkflowIdConflictPolicy != enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING { | ||
| signalWithStartRequest.WorkflowIdConflictPolicy != enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING && | ||
| signalWithStartRequest.WorkflowIdConflictPolicy != enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL { |
There was a problem hiding this comment.
Maybe change this to explicitly check for use existing?
| switch r := result.(type) { | ||
| case interface{ ValueAsAny() any }: | ||
| ps, err := payloads.Encode(r.ValueAsAny()) | ||
| ps, err := sdkconverter.PreferProtoDataConverter.ToPayloads(r.ValueAsAny()) |
There was a problem hiding this comment.
Are we aligned with UI and SDK that we are going to use binary proto for responses?
lina-temporal
left a comment
There was a problem hiding this comment.
few comments mostly for my comprehension, approved
| // desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, response.RunID) | ||
| // s.NoError(err) | ||
| // requestIDInfos := desc.GetWorkflowExtendedInfo().GetRequestIdInfos() | ||
| // requestID := slices.Collect(maps.Keys(requestIDInfos))[0] | ||
| // s.Equal(opScheduledEvent.GetNexusOperationScheduledEventAttributes().GetRequestId(), requestID) |
There was a problem hiding this comment.
Why's this commented out?
| migration.Module, | ||
| resource.Module, | ||
| deletenamespace.Module, | ||
| chasmscheduler.Module, |
There was a problem hiding this comment.
Why do these have to be included? Worker shouldn't need chasmscheduler's module injected.
There was a problem hiding this comment.
Without these the containers fail to start with:
logger.go:146: 2026-05-20T15:21:09.404Z panic FATAL: unable to start worker service {"error": "could not build arguments for function \"go.temporal.io/server/service/worker\".ServiceLifetimeHooks (/home/runner/work/temporal/temporal/service/worker/fx.go:192): failed to build *worker.Service: could not build arguments for function \"go.temporal.io/server/service/worker\".NewService (/home/runner/work/temporal/temporal/service/worker/service.go:111): failed to build *worker.PerNamespaceWorkerManager: could not build arguments for function \"go.temporal.io/server/service/worker\".PerNamespaceWorkerManagerProvider (/home/runner/work/temporal/temporal/service/worker/fx.go:207): could not build value group common.PerNSWorkerComponent[group=\"perNamespaceWorkerComponent\"]: missing dependencies for function \"go.temporal.io/server/service/worker/scheduler\".NewResult (/home/runner/work/temporal/temporal/service/worker/scheduler/fx.go:81): missing type: *scheduler.SpecBuilder", "errorVerbose": "could not build arguments for function \"go.temporal.io/server/service/worker\".ServiceLifetimeHooks\n\t/home/runner/work/temporal/temporal/service/worker/fx.go:192:\nfailed to build *worker.Service:\ncould not build arguments for function \"go.temporal.io/server/service/worker\".NewService\n\t/home/runner/work/temporal/temporal/service/worker/service.go:111:\nfailed to build *worker.PerNamespaceWorkerManager:\ncould not build arguments for function \"go.temporal.io/server/service/worker\".PerNamespaceWorkerManagerProvider\n\t/home/runner/work/temporal/temporal/service/worker/fx.go:207:\ncould not build value group common.PerNSWorkerComponent[group=\"perNamespaceWorkerComponent\"]:\nmissing dependencies for function \"go.temporal.io/server/service/worker/scheduler\".NewResult\n\t/home/runner/work/temporal/temporal/service/worker/scheduler/fx.go:81:\nmissing type:\n\t- *scheduler.SpecBuilder (did you mean to Provide it?)", "host": "127.0.0.1:42869", "logging-call-at": "/home/runner/work/temporal/temporal/tests/testcore/onebox.go:663"}
| scheduler.Module, | ||
| callback.Module, |
There was a problem hiding this comment.
Why does matching need these now?
| ) | ||
|
|
||
| type Config struct { | ||
| maxIDLengthLimit dynamicconfig.IntPropertyFn |
There was a problem hiding this comment.
IMO this (and maybe linkMaxSize) could be simple constants, since we probably wouldn't want these particular limits for specific customers. Up to you.
What changed?
This PR adds
SignalWithStartWorkflowExecutionas a synchronous Nexus operation exposed via__temporal_system endpoint, allowing workflows to signal-with-start other workflows through the CHASM Nexus operation framework.Key changes:
chasm/lib/workflow/nexus_service.go:workflowServiceNexusHandlerimplements theSignalWithStartWorkflowExecutionNexus sync operation by resolving the namespace and delegating to the History service. ASignalWithStartOperationProcessorhandles input enrichment (namespace, request ID, links) and routing via CHASM'sNexusOperationProcessorResult.chasm/lib/workflow/library.go— library now holds theworkflowServiceNexusHandler, the workflowConfig, the SA mapper provider, and the SA validator.newLibrary(used by fx) takes those dependencies, while the publicNewLibrarykeeps its old signature for external callers. AddsNexusServices()so the library registers its Nexus service via CHASM.chasm/lib/workflow/validator.go:RequestValidatorconsolidates theSignalWithStartWorkflowExecutionvalidation logic (previously inlined inWorkflowHandler) into a reusable, injectable struct. This same validator is used by both the frontend handler and the new CHASM processor.EnableSignalWithStartFromWorkflow(namespace-scoped, default false).service/frontend/workflow_handler.go: Removed theSignalWithStartvalidation blockservice/history/fx.go: Provides aHistoryServiceServerProviderso the CHASM workflow library can call the history handler directly.temporal/fx.go: Removes the now-redundantChasmLibraryOptionsgrouping; each service module registers its own CHASM libraries.components/nexusoperations/workflow/commands.go:NotFoundandInvalidArgumenterrors during Nexus command handling are now surfaced as workflow task failures instead of being treated as transient handler errors.common/payloads: AddsEncodeSingle,MustEncodeSingle, andMustEncodehelpers used in tests.cmd/tools/getproto: Adds support for nexus-proto-annotations proto imports.tests/signal_with_start_from_workflow_test.go: Functional test suite covering the happy path, duplicate detection, conflict policies, and validation rejection for the new Nexus operation.Why?
This functionality is one of our most requested GitHub issues.
How did you test it?
Potential risks
The history service now directly exposes a
HistoryServiceServerinterface viafxfor injection into the CHASM workflow library. This tight coupling between the CHASM workflow library and the history handler could complicate future layering — callers outside the history service should not adopt this pattern. The feature is gated byhistory.enableSignalWithStartFromWorkflowfor rollout.