Nexus chasm cleanup#9843
Conversation
- Introduce RetryPolicyConfig struct and typed DC setting (replaces separate initial/max interval settings) - Rename MaxConcurrentOperations to MaxConcurrentOperationsPerWorkflow with updated DC key - Fix CallbackURLTemplate converter to use direct type assertion - Add PrincipalType/PrincipalName to disallowed operation headers
Proto schema: - Add started_time and closed_time to OperationState - Remove attempt from timeout task messages (only invocation/backoff tasks track attempt) - Reorder task messages (timeouts first, then invocation) State machine: - Increment attempt on schedule/reschedule (before task emission) rather than after attempt completion - makes attempt number approximate but available in the task - Track StartedTime and ClosedTime in transitions - Support explicit StartTime/CompleteTime in events for async completion race conditions - Propagate Failure through Failed/Canceled transitions - Add resolveUnsuccessfully() for terminal state bookkeeping - Clear NextAttemptScheduleTime in all terminal transitions - Base start-to-close timeout on actual start time, not schedule time - Move RequestedTime assignment to Operation.Cancel() Task handlers: - Privatize all task handler types, options, and constructors - Inject task handlers into Library via constructor - Extract invocation interface with HTTP, system, and timeout implementations - Unify error classification in newInvocationResult() - Simplify Validate to return (bool, nil) for invalid states - Fix OnCancelled -> OnCanceled spelling - Fix fx module name
Workflow registry: - Add Library interface with CommandHandlers() and EventDefinitions() - Replace individual Register* methods with single Register(Library) - Add EventDefinitionByGoType[D] for type-safe event lookup - Add generic AddAndApplyHistoryEvent[D] free function - Rename EventDefinition() to EventDefinitionByEventType() - Privatize workflow Library struct, expose NewLibrary constructor - Use serviceerror.NewInternalf for internal errors Nexus workflow: - Add library type implementing workflow Library interface - Replace registerCommandHandlers/registerEvents with Library methods - Use type-safe AddAndApplyHistoryEvent[D] in command handlers - Consolidate two fx.Invoke calls into one - Set initial Attempt to 0 (TransitionScheduled now increments) - Rename chasmCtx to ctx - Fix OnNexusOperationCancelled -> OnNexusOperationCanceled spelling
|
|
||
| func newLibrary() *Library { | ||
| return &Library{} | ||
| func newLibrary( |
There was a problem hiding this comment.
@stephanos I left it up to you to break out the components only library.
There was a problem hiding this comment.
@stephanos there are some key behavior changes here: attempt is now increments on scheduled, start and close time can be provided externally (needed for async completion).
|
|
||
| var MaxConcurrentOperations = dynamicconfig.NewNamespaceIntSetting( | ||
| "nexusoperation.limit.operation.concurrency", | ||
| var MaxConcurrentOperationsPerWorkflow = dynamicconfig.NewNamespaceIntSetting( |
There was a problem hiding this comment.
Do we have/need a limit for standalone?
There was a problem hiding this comment.
No, this is just pending nexus operations in a single workflow.
chasm/lib/nexusoperation/config.go
Outdated
| RequestTimeout: RequestTimeout.Get(dc), | ||
| MinRequestTimeout: MinRequestTimeout.Get(dc), | ||
| MaxConcurrentOperations: MaxConcurrentOperations.Get(dc), | ||
| MaxConcurrentOperations: MaxConcurrentOperationsPerWorkflow.Get(dc), |
There was a problem hiding this comment.
Should we rename the field, too?
| func (w *Workflow) AddAndApplyHistoryEvent( | ||
| // AddAndApplyHistoryEvent adds a history event to the workflow and applies the corresponding event definition, | ||
| // looked up by Go type. This is the preferred way to add and apply events as it provides go-to-definition navigation. | ||
| func AddAndApplyHistoryEvent[D EventDefinition]( |
There was a problem hiding this comment.
Yeah, but we should consider moving all of the workflow events into chasm/lib/workflow so we can actually refer to them, otherwise we run into circular dependency issues that are annoying to break.
| // Clear the next attempt schedule time when leaving BACKING_OFF state. This field is only valid in | ||
| // BACKING_OFF state. | ||
| o.NextAttemptScheduleTime = nil | ||
| o.ClosedTime = timestamppb.New(ctx.Now(o)) |
There was a problem hiding this comment.
Should this use CompleteTime if non-nil?
| // The number of attempts made to deliver the start operation request. | ||
| // This number represents a minimum bound since the attempt is incremented after the request completes. | ||
| int32 attempt = 12; | ||
| // This number is approximate, it is incremeted when a task is added to the history queue. |
There was a problem hiding this comment.
| // This number is approximate, it is incremeted when a task is added to the history queue. | |
| // This number is approximate, it is incremented when a task is added to the history queue. |
| // Special marker for Temporal->Temporal calls to indicate that the original failure should be unwrapped. | ||
| // Temporal uses a wrapper operation error with no additional information to transmit the OperationError over the network. | ||
| // The meaningful information is in the operation error's cause. | ||
| unwrapError := opErr.OriginalFailure.Metadata["unwrap-error"] == "true" |
There was a problem hiding this comment.
is OriginalFailure always set? In the branch before this is called I see if handlerErr.OriginalFailure != nil
There was a problem hiding this comment.
Hmm.. it would only be nil if it is constructed in process, and we only do that for HandlerError instances. But good callout, I'll add that here.
| } | ||
| return invocationResultFail{failure: failure}, nil | ||
| } | ||
| if opTimeoutBelowMinErr, ok := errors.AsType[*operationTimeoutBelowMinError](callErr); ok { |
There was a problem hiding this comment.
nit^2: is the missing newline above this if block intentional? It signals to me that these two belong together more than the other ones but I don't quite see why they do.
There was a problem hiding this comment.
I think it was because both of these are cases for errors from the nexus SDK but it's fine either way.
| } | ||
|
|
||
| if errors.Is(callErr, context.DeadlineExceeded) || errors.Is(callErr, context.Canceled) { | ||
| // If timed out, don't leak internal info to the user |
There was a problem hiding this comment.
| // If timed out, don't leak internal info to the user | |
| // If timed out, don't leak internal info to the user. |
| if errors.Is(callErr, ErrResponseBodyTooLarge) || errors.Is(callErr, ErrInvalidOperationToken) { | ||
| return nonRetryableFailResult(callErr) | ||
| } |
There was a problem hiding this comment.
If I'm not mistaken these two became invocationResultRetry now instead of being non-retryable?
| // saveResult is an UpdateComponent callback that saves the invocation outcome. | ||
| func (o *Operation) saveResult( | ||
| // saveResultInput is the input to the Operation.saveResult method used in UpdateComponent. | ||
| type saveResultInput struct { |
There was a problem hiding this comment.
nit: should this be saveInvocationResultInput? Granted, a bit long but would match the other types and methods.
6421ece
into
temporalio:nexus/hsm-to-chasm-migration
What changed?
Why?