Implement Pause/Unpause/Reset/UpdateOptions for standalone activities#10106
Implement Pause/Unpause/Reset/UpdateOptions for standalone activities#10106spkane31 wants to merge 25 commits into
Conversation
569a82b to
d09967d
Compare
eed1b4d to
f5e335f
Compare
| // Incremented each time a new ScheduleToCloseTimeoutTask is scheduled (at activity creation | ||
| // and on each options update that re-schedules the task). Unlike attempt.stamp, this counter | ||
| // is NOT incremented on retries, because schedule-to-close spans the full activity lifetime. | ||
| int32 stamp = 15; |
There was a problem hiding this comment.
Question: This is intended to be a monotonic fencing token for updating activity options generally, is that correct?
In this specific instance you're using it for updating scheduleToClose timeout options, but in general it refers to the entire set of activity options being updated in the DB?
If that's the case, would there be value in describing this fencing token with a name that's a bit more specific to what it's guarding against? I don't have a firm opinion here, but perhaps something indicating that it's relating to the activity options lifecyle perhaps?
If this is a convention within CHASM in which every update is guarded by a field called stamp ignore me though, happy to go with the convention if it's a preexisting thing.
There was a problem hiding this comment.
Updating to schedule_to_close_stamp to make it obvious this stamp is only for schedule_to_close changes.
There was a problem hiding this comment.
ah, I just noticed htis is more of a convention for chasm, feel free to ignore my comment
| bool activity_reset = 17; | ||
|
|
||
| // Set alongside activity_reset when heartbeat details should be cleared on the next retry. | ||
| bool reset_heartbeats = 18; |
There was a problem hiding this comment.
I'd be wary using booleans in IDL generally, because it's very often the case that the two things you want become a third - they usually work better as enums/ints over time.
Perhaps these three ought to be an an update struct, whose presence indicates that there's a need to update the options? Like, for example:
UpdateActivityOptions updateActivityOptions = 16
...
message UpdateActivityOptions {
enum ActivityReset {
NoChange = 0;
ResetAttempts = 1;
}
enum HeartbeatReset {
NoChange = 0;
ResetHeartbeats = 1;
}
ActivityReset activity_reset = 1;
HeartbeatReset reset_heartbeats = 2;
}
There was a problem hiding this comment.
I agree with this sentiment but these are new APIs that are expected to be drop in replacements for the existing methods that only apply to workflow activities, we are planning on removing those APIs in the near future. So we are unfortunately stuck with this behavior because it is exposed in the CLI.
There was a problem hiding this comment.
I'm not quite following here -- this activity proto is internal. (Also what was the reference to the CLI?)
| // Requests may specify how to obtain the namespace ID. Defaults to the "namespace_id" field. | ||
| string namespace_id = 2; | ||
| // Request will be routed by resolving the namespace ID and business ID to a given shard. | ||
| string business_id = 3; |
There was a problem hiding this comment.
is this safe to change? Is it in use? I'm not familiar with this file, so might be off base here, but this seems like a breaking change?
There was a problem hiding this comment.
Here is the original comment but no because this is an entirely internal implementation it does not constitute a breaking change.
| blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter, | ||
| blobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter, | ||
| logger log.Logger, | ||
| ) error { |
There was a problem hiding this comment.
I believe this code path is missing generation of a request ID in the case where the client doesn't set it.
if req.GetRequestId() == "" {
req.RequestId = uuid.NewString()Can you check that this is done in all validation code paths where it's needed? In the current PR state I believe there are code paths where we end up comparing request IDs "" == "" and saying "request IDs match". So it would be great to have a test that fails currently, and passes when we add the missing server-side generation of request IDs.
| } | ||
|
|
||
| func validateDescribeActivityExecutionRequest( | ||
| func validateAndNormalizeStartRequest( |
There was a problem hiding this comment.
Could you move this function (validateAndNormalizeStartRequest) down to its original location to make the diff readable?
| if len(req.GetRequestId()) > maxIDLengthLimit { | ||
| return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d", | ||
| len(req.GetRequestId()), maxIDLengthLimit) | ||
| } |
There was a problem hiding this comment.
This function used to have
if req.GetRequestId() == "" {
req.RequestId = uuid.NewString()but it looks like it was removed in a merge.
| ) error { | ||
| if req.GetRequestId() == "" { | ||
| req.RequestId = uuid.NewString() | ||
| } else if len(req.GetRequestId()) > maxIDLengthLimit { |
There was a problem hiding this comment.
Can you check that we're minting missing request IDs and doing the request ID length check in all places needed?
There was a problem hiding this comment.
Checked and added the request ID length check where missing.
| if len(req.GetActivityId()) > maxIDLengthLimit { | ||
| return serviceerror.NewInvalidArgumentf("activity ID exceeds length limit. Length=%d Limit=%d", | ||
| len(req.GetActivityId()), maxIDLengthLimit) | ||
| } |
There was a problem hiding this comment.
Is there a place where we validate that workflow_id is present (and activity_id absent) if workflow_activity_type is present?
There was a problem hiding this comment.
We removed the support for workflow_activity_type so we do not have that validation
75d93d1 to
bb342eb
Compare
fretz12
left a comment
There was a problem hiding this comment.
Approved, but wait for Dan's approval too
| if newReqID != "" && existingReqID == newReqID { | ||
| return &activitypb.PauseActivityExecutionResponse{}, nil | ||
| } | ||
| return &activitypb.PauseActivityExecutionResponse{}, serviceerror.NewFailedPrecondition("activity is already paused") |
There was a problem hiding this comment.
| return &activitypb.PauseActivityExecutionResponse{}, serviceerror.NewFailedPrecondition("activity is already paused") | |
| return nil, serviceerror.NewFailedPrecondition("activity is already paused") |
| // previous attempt or pre-update state are discarded. | ||
| // Note: ScheduleToCloseTimeoutTask uses a separate ActivityState.schedule_to_close_stamp because | ||
| // it spans the full activity lifetime and must not be invalidated on retry. | ||
| // TODO: also invalidate on pause and reset when those are supported. |
* Add four new RPC methods to `historyservice`(PauseActivityExecution,UnpauseActivityExecution,ResetActivityExecution,UpdateACtivityExecutionOptions) * Add new historyservice request/response messages wrapping api repos requests * Generated code First step towards supporting the four activity operator commands on both workflow activities and standalone CHASM activities. The overall aim of this branch is to support operator API commands (Pause, Unpause, Reset, UpdateOptions) for both Standalone Activities (SAA) and Workflow Activities (WA). These will share the same new external workflowservice RPC entrypoints and will be distinguished by the presence of a workflow ID. Pause, Unpause, UpdateOptions, and Reset already exist for WA with an experimental and soon-to-be deprecated public API. The API handlers added in this PR route to the current experimental implementation. We define Frontend and History service RPC handlers in chasm/lib/activity. Requests for both SAA and WA are routed from Frontend to History using these handlers. The handler in the history service (part of the CHASM ActivityService which is hosted by History service) then inspects the workflow ID and activity ID and dispatches according to whether the request is for an SAA vs WA. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) No runtime behavior changes.
Implements the `UpdateActivityExecutionOptions` RPC for standalone activities. The workflow-activity path is already supported via the existing UpdateActivityOptions history service API this PR only wires up the standalone path end-to-end. Core changes: - `chasm/lib/activity/activity.go` — Adds UpdateActivityExecutionOptions method on Activity: - Validates mutual exclusion of `update_mask` / `restore_original` - `restore_original`: bulk-assigns all fields from the `original_options` snapshot stored at schedule time - Field-mask path: applies only the specified fields via `mergeActivityOptions` - Stamp invalidation: bumps `attempt.Stamp` to cancel in-flight `ActivityDispatchTask` and timeout tasks, then re-queues a new dispatch and schedule-to-start timeout - Retry interval recalculation: after a retry-policy update, recomputes `attempt.CurrentRetryInterval` so the re-dispatch fires at the new (possibly shorter) interval rather than the stale one - Schedule-to-close update: adds a new `ScheduleToCloseTimeoutTask` at the updated deadline so a shortened timeout fires immediately - Adds `original_options` to `ActivityState` proto and populates it in `NewStandaloneActivity` - chasm/lib/activity/handler.go — Routes the standalone path to `chasm.UpdateComponent` to (*Activity).UpdateActivityExecutionOptions - chasm/lib/activity/frontend.go — Adds input validation (`validateUpdateActivityExecutionOptionsRequest`: activity ID required/length, identity length, run ID UUID), and gates the standalone path behind the Enabled config flag while always permitting the workflow path UpdateActivityExecutionOptions is the new unified RPC for updating activity options for both standalone activities and workflow-embedded activities. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is into a feature branch
…9851) Implement `PauseActivityExecution` and `UnpauseActivityExecution` for standalone activities. Previously both handlers returned Unimplemented for the SAA path. They now use chasm.UpdateComponent to apply pause/unpause state directly to the CHASM Activity component, matching the semantics of the existing workflow-activity implementation. - Proto (`activity_state.proto`): Added `ActivityPauseState` message (`pause_time`, `identity`, `reason`) and a `pause_state` field on `ActivityState`. - `handlePauseRequested`: Sets `PauseState` on the component. If the activity is in `SCHEDULED` state, increments the attempt stamp so the existing `ActivityDispatchTask` is invalidated — preventing the activity from being dispatched to a worker while paused. For `STARTED` activities the stamp is left unchanged; the worker retains a valid token and receives `ActivityPaused: true` on its next heartbeat. - `handleUnpauseRequested`: Clears `PauseState`, optionally resets the attempt count and/or heartbeat details, and if the activity is `SCHEDULED` bumps the stamp and enqueues a new `ActivityDispatchTask` with optional jitter. - `RecordHeartbeat`: Wires up the `ActivityPaused` response field. - `buildActivityExecutionInfo`: Maps pause state to `PENDING_ACTIVITY_STATE_PAUSED` (activity is scheduled but not running) or `PENDING_ACTIVITY_STATE_PAUSE_REQUESTED` (activity is running on the worker) in the `RunState` field of `DescribeActivityExecution`. `PauseActivityExecution` / `UnpauseActivityExecution` were already implemented for workflow-embedded activities via the history service. Standalone activities had stub handlers that returned `Unimplemented`, this brings SAA to feature parity with workflow activities for the pause/unpause lifecycle operations. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is a translation of an existing api (Pause/UnpauseActivity)
## What changed? Fixing a bad rebase by (1) adding `validateAndNormalizeStartActivityExecutionRequest` function, (2) correcting the validateXXX functions, and (3) running `make fmt` to format the proto files. ## Why? Fixes bad rebase and unblocks #10001 and #9852 ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks NA
…ctivity (#10001) ## What changed? When calling `PauseActivityExecution` on an already paused activity a `serviceerror.FailedPrecondition` is returned to the user. ## Why? If a user attempts to pause an already paused activity with an identity and reason and then inspects that activity it will show a different identity and reason which is a confusing behavior, returning an error is a better way to show that the request did not alter the activity. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) ## Potential risks No, this is into a feature branch. After merging the feature branch into main the behavior will change for workflow activities which treat Pause on an already paused activity as a no-op.
## What changed? Implemented ResetActivityExecution for standalone activities (SAA) in chasm/lib/activity. - `activity_state.proto`: Added `activity_reset` (bool) and `reset_heartbeats` (bool) fields to `ActivityState` to carry deferred-reset state across retries. - `statemachine.go`: `TransitionRescheduled` now checks the `ActivityReset` flag before incrementing the attempt count. When set, it zeroes the count first and optionally clears heartbeat details if ResetHeartbeats is also set. - `activity.go`: Added handleReset with two execution paths: - `SCHEDULED`: immediately resets `Count=1`, increments Stamp (invalidating any in-flight dispatch tasks), and enqueues a new ActivityDispatchTask. - `STARTED` / `CANCEL_REQUESTED`: sets `ActivityReset` = true (and optionally ResetHeartbeats = true) so the reset is applied on the next retry via TransitionRescheduled, without touching the running attempt's task token. - Terminal states: returns NotFound. - `handler.go`: Replaced serviceerror.NewUnimplemented(...) in ResetActivityExecution with a chasm.UpdateComponent call using (*Activity).handleReset. ## Why? `ResetActivityExecution` is already implemented for workflow-embedded activities. Standalone activities had stub handlers that returned Unimplemented, this brings SAA to feature parity with workflow activities for the reset operations. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [X] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) ## Potential risks Minimal, this is a translation of an existing api (ResetActivity)
1058a7c to
3afd8d6
Compare
| }) | ||
| require.NoError(t, err) | ||
| require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_PAUSED, descResp.GetInfo().GetRunState()) | ||
|
|
There was a problem hiding this comment.
I inserted a call to Unpause here, expecting the test to fail, but it didn't.
_, err = env.FrontendClient().UnpauseActivityExecution(ctx, &workflowservice.UnpauseActivityExecutionRequest{
Namespace: env.Namespace().String(),
ActivityId: activityID,
RunId: runID,
Identity: "test-identity",
Reason: "test-pause",
})
require.NoError(t, err)There was a problem hiding this comment.
Inserting a pause won't change the assertion, the assertion is validating that no task is dispatched while paused. The timeout on the poll is 2s so it's not guaranteed that the unpause will result in the task dispatch in that time. The assertion you're looking at is in UnpauseWhileScheduled which asserts the task gets dispatched after unpausing the test by using a longer context than the 2s
There was a problem hiding this comment.
No, that's not it. 2s is enough time for the Unpause to trigger a dispatch. The problem is that the 2s timeout you're using is too short. What's happening is that the poll request in the test is rejected with Context timeout is too short for long poll API, so your test assertion is never made, because if err == nil is always false.
So I think we need to increase that to something like 3s. If you do that, then adding an Unpause as I did does cause the test to fail. At the moment the test is not verifying that Pause blocks dispatch (but I'm not aware of any problem with the implementation, just the test.)
| require.NoError(t, err) | ||
| } | ||
|
|
||
| resetActivity := func(ctx context.Context, t *testing.T, activityID, runID string, resetHeartbeat bool) { |
There was a problem hiding this comment.
nit: I believe in Go it's conventional to give a name prefix must to functions that panic on error, so mustResetActivity (same for any other functions of this form)
There was a problem hiding this comment.
This function won't panic, it will fail the test which is expected if the reset call returns an error.
| require.NoError(t, err) | ||
| }) | ||
|
|
||
| t.Run("WhileCancelRequested", func(t *testing.T) { |
There was a problem hiding this comment.
Nit: This test still passes even if the reset is removed. Could make that clear in the name ("WhileCancelRequestedDoesNotFail") or in comments.
| }) | ||
| require.NoError(t, err) | ||
| require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_PAUSED, descResp.GetInfo().GetRunState()) | ||
|
|
There was a problem hiding this comment.
No, that's not it. 2s is enough time for the Unpause to trigger a dispatch. The problem is that the 2s timeout you're using is too short. What's happening is that the poll request in the test is rejected with Context timeout is too short for long poll API, so your test assertion is never made, because if err == nil is always false.
So I think we need to increase that to something like 3s. If you do that, then adding an Unpause as I did does cause the test to fail. At the moment the test is not verifying that Pause blocks dispatch (but I'm not aware of any problem with the implementation, just the test.)
| require.NoError(t, err) | ||
|
|
||
| // Second pause with the same request ID should succeed (idempotent no-op). | ||
| _, err = env.FrontendClient().PauseActivityExecution(ctx, pauseReq) |
There was a problem hiding this comment.
This doesn't actually check it's idempotent, just that it succeeds. To prove idempotency we could change some aspect of the request when it's submitted the second time and show that that aspect is ignored the second time because the server recognized it as a repeat request.
I tried "reason" but I think pause reason is not exposed? Also due to a known bug in CHASM, there's a persistence write causing StateTransitionCount to increase when processing repeat request IDs, so we can't use that either.
If you don't see a way to test idempotency currently, could just leave a comment to that effect.
| CancelRequested: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, | ||
| // TODO(saa-preview): ActivityPaused, ActivityReset | ||
| ActivityPaused: a.PauseState != nil, | ||
| // ActivityReset is intentionally not reported via heartbeat; reset takes effect on the next retry. |
There was a problem hiding this comment.
Why is the field in the proto if we're not using it for this purpose? I would have thought that we want to give a long-running activity a chance to stop executing on reset.
| // via ActivityPaused=true on its next heartbeat. The external run state in that case is | ||
| // PAUSE_REQUESTED. If the worker fails and retries while the flag is set, the retry lands in | ||
| // SCHEDULED with pause_state still populated and the dispatch task is blocked until unpause. | ||
| ACTIVITY_EXECUTION_STATUS_PAUSED = 9; |
There was a problem hiding this comment.
Before we make a decision here, I'd like us to lay out the pros and cons.
When a pause request is received in state Started, there is a change in server state. We could model this explicitly in our state machine, or we could model it with additional state outside the explicit state machine.
In the current implementation, we transition from Scheduled to Paused, but when we receive Pause in state Started we transition to (Started, flag_set). Then, when the attempt is over and we are able to really pause, we enter state (Scheduled, flag_set). But this is confusing -- what's the difference between this and Paused? In fact, I think if we are going to use the flag, it might be cleaner to get rid of Paused state entirely and model pause via the flag alone rather than as a mixture.
There are various places where a code author has to remember to check the pause state as well as the standard statuses. For example, in task validators, in handlePauseRequested, handleUnpauseRequested. There's logic for when to clear the pause state (terminal states, and conditionally in handleReset, etc.
So, what are the pros and cons of the 3 options?
- Add PauseRequested to the state machine. A slightly larger transition matrix but now code just branches on status and we don't worry about code forgetting or not understanding that it has to consider PauseState. This is consistent with the fact that we have a CancelRequested state.
- Use a flag and also have Paused. But this seems to mean that Paused and (Scheduled, flag_set) are confusingly similar.
- Use a flag alone to model pause, no pause-related state machine states.
SAA is using an explicit state machine to model/validate state transitions so we should default to (1). Are we aware of a strong argument for having CancelRequested but not PauseRequested If not, then let's embrace our state machine and implement them consistently.
That leaves ResetRequested. Do you think there's a stronger argument for implementing that one as a flag?
There was a problem hiding this comment.
PauseRequested (and ResetRequested if we decided to add that) would mean that standalone activities have a different behavior than workflow activities, which is something I tried to explicitly avoid. I think it's reasonable to want to use state transitions to model pause requested but this means when we port workflow activities to standalone activities that we have to be careful about behavior changes while doing that transition.
There was a problem hiding this comment.
The only user-visible difference if we use PauseRequested instead of a flag is that we cannot be in both cancel_requested and pause_requested at the same time: the heartbeat response can contain one or the other, but not both (cancel requested has precedence). Would you agree that that's better and clearer behavior? Can you think of any other user-facing differences?
As a general comment, IMO we should focus on designing the best user-facing behavior for Standalone Activity that we can, and we should also set a solid foundation and allow the new CHASM implementations to be good implementations (e.g. avoiding the confusion of having both Paused and Scheduled-with-pause-flag-set). It's OK to change minor details of workflow activity Pause, since this is not GA, especially ones that aren't desirable (being paused and canceled at the same time). We'll want to finalize this before merging -- we don't want to change the schema of the persisted data after it is live.
I'm not sure about ResetRequested, WDYT? Does the implementation work out cleanly with it as an explicit state rather the flags?
There was a problem hiding this comment.
I am ok with using Pause and Reset Requested transitions instead of state flags (I can prototype reset requested), I originally had a PauseRequested but it was removed. I am also trying to think about this in the light of worker commands and the worker nexus channel where the server can directly send commands to a worker instead of waiting for the worker to poll. This is something we probably want to implement in the future for activities (SDKs are still in progress for workflow cancellation request). If we chose to implement this how would that affect the XXXRequested state and end up being something we remove later, either way it's probably fine.
https://github.com/temporalio/temporal/blob/main/docs/architecture/worker-commands.md
| a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { | ||
| // Worker continues with its existing token — no stamp bump needed, no dispatch task. | ||
| // Cancel takes precedence over pause. Unpause clears the pause flag but does not re-dispatch; | ||
| // the activity remains CANCEL_REQUESTED and will be cancelled when the worker responds. |
There was a problem hiding this comment.
If we're in CancelRequested then Unpause will never result in execution resuming: retryable failures go straight to FAILED. So I'm thinking it should be a FailedPrecondition, like the terminal states are above.
What changed?
This branch implements the activity operator commands feature — a set of server-initiated control APIs (PauseActivityExecution, UnpauseActivityExecution, ResetActivityExecution, UpdateActivityExecutionOptions) for both workflow-embedded and standalone activities.
Why?
Activity operator APIs existed for workflow-embedded activities but were not wired up for standalone activities.
How did you test it?
Potential risks
Minimal, this is a new feature so it won't break users.