Implement UpdateActivityExecutionOptions#9850
Conversation
adcb1b0 to
6e743f6
Compare
| // Discard stale tasks created before a schedule-to-close extension. | ||
| // When the deadline is extended, a new task is added at the updated deadline; any earlier | ||
| // task (from the old shorter deadline) must not time out the activity prematurely. | ||
| if timeout := activity.GetScheduleToCloseTimeout().AsDuration(); timeout > 0 { |
There was a problem hiding this comment.
If a user updates the options such that scheduleToClose is 0 (disabled), this validation would cause a spurious fire.
We should add a check.
timeout := activity.GetScheduleToCloseTimeout().AsDuration()
if timeout <= 0 {
return false, nil
}
Also, I'm wondering if it's more robust to have a dedicated stamp (like we do for attempt) for scheduleToClose. A bit more extra storage, but avoids any time comparisons for validations as it can be fragile. @bergundy @dandavison lmk your thoughts
There was a problem hiding this comment.
Yeah, just put a stamp on this instead IMHO. Just make sure that if there's no stamp on the task it's not used for validation.
There was a problem hiding this comment.
Let's remember to update the comment here: https://github.com/temporalio/temporal/blob/dan/activity-operator-update-options/chasm/lib/activity/proto/v1/activity_state.proto#L143-L148
…if non terminal, re-validate options, general code hygiene
| // Discard stale tasks created before a schedule-to-close extension. | ||
| // When the deadline is extended, a new task is added at the updated deadline; any earlier | ||
| // task (from the old shorter deadline) must not time out the activity prematurely. | ||
| if timeout := activity.GetScheduleToCloseTimeout().AsDuration(); timeout > 0 { |
There was a problem hiding this comment.
Let's remember to update the comment here: https://github.com/temporalio/temporal/blob/dan/activity-operator-update-options/chasm/lib/activity/proto/v1/activity_state.proto#L143-L148
| chasm.TaskAttributes{ScheduledTime: deadline}, | ||
| &activitypb.StartToCloseTimeoutTask{Stamp: attempt.GetStamp()}, | ||
| ) | ||
| } |
There was a problem hiding this comment.
We need to regenerate the heartbeat task also to cover the case where the pending heartbeat task was invalidated by the stamp bump. It looks like the heartbeating test works around that to make it pass.
There was a problem hiding this comment.
Dispatched the heartbeat task using the same logic in TransitionStarted and changed the test to remove the heartbeating to force the task generation
| ) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Maybe worth leaving a comment here for ourselves that we need to handle the StartDelay timer also.
| require.NoError(t, err) | ||
|
|
||
| // Heartbeat once — this creates a new HeartbeatTimeoutTask with the updated 2s timeout | ||
| // and the new stamp. Without this heartbeat the timer would not be armed. |
There was a problem hiding this comment.
I don't think we should be doing this -- it's papering over the lack of heartbeat task regeneration in the implementation.
|
|
||
| attempt.Stamp++ | ||
|
|
||
| if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED { |
There was a problem hiding this comment.
We need to also consider CANCEL_REQUESTED -- we don't want to miss a heartbeat timeout while in CANCEL_REQUESTED. It look like there's a pre-existing bug in that the start-to-close task validator rejects the task if it is in CANCEL_REQUESTED; I've opened #9901 for that. So I believe the condition here should be STARTED || CANCEL_REQUESTED.
There was a problem hiding this comment.
Added your changes into the branch
| activity := &Activity{ | ||
| ActivityState: &activitypb.ActivityState{ | ||
| ActivityType: request.ActivityType, | ||
| ActivityState: common.CloneProto(&activitypb.ActivityState{ |
There was a problem hiding this comment.
maybe a brief comment on why we're cloning
| // for workflow-embedded activities; complexity is inherent to the per-field update pattern. | ||
| // | ||
| //nolint:revive // cyclomatic: field-mask application mirrors existing updateactivityoptions logic | ||
| func (a *Activity) mergeActivityOptions( |
There was a problem hiding this comment.
Can we refactor the core logic here with that in the merge method in service/history/api/updateactivityoptions/api.go? It's largely a repeat and good to have all in one place.
| return nil | ||
| } | ||
|
|
||
| func validateUpdateActivityExecutionOptionsRequest( |
There was a problem hiding this comment.
We should be validating the following:
TaskQueue: Use standard task queue validation, this is a security risk that can cause an activity to be executed on the per-namespace-worker task queue
ScheduleToCloseTimeout: validate+normalize with standard timeout validator
ScheduleToStartTimeout: validate+normalize with standard timeout validator
StartToCloseTimeout: validate+normalize with standard timeout validator
HeartbeatTimeout: validate+normalize with standard timeout validator
Priority: already validated
RetryPolicy: I am pretty sure there's a validator and maybe normalizer for this.
| // An incremental version number used to validate ScheduleToCloseTimeoutTask tasks. | ||
| // Incremented each time a new ScheduleToCloseTimeoutTask is scheduled (at activity creation | ||
| // and on each options update that re-schedules the task). Unlike attempt.stamp, this counter | ||
| // is NOT reset on retries, because schedule-to-close spans the full activity lifetime. |
There was a problem hiding this comment.
| // is NOT reset on retries, because schedule-to-close spans the full activity lifetime. | |
| // is NOT incremented on retries, because schedule-to-close spans the full activity lifetime. |
| } | ||
|
|
||
| message ScheduleToCloseTimeoutTask { | ||
| // The current schedule-to-close stamp for this activity. Used for task validation. |
There was a problem hiding this comment.
| // The current schedule-to-close stamp for this activity. Used for task validation. | |
| // The schedule-to-close stamp for this task. Used for task validation. |
| // The next heartbeat time is the max of (the last heartbeats recorded time and | ||
| // the current attempts started time) plus the heartbeat timeout | ||
| lastHb, _ := a.LastHeartbeat.TryGet(ctx) | ||
| lastHBTime := util.MaxTime( |
There was a problem hiding this comment.
nit: inconsistent lastHb vs lastHB
| mergeInto.RetryPolicy = &commonpb.RetryPolicy{} | ||
| } | ||
| mergeInto.RetryPolicy.MaximumAttempts = mergeFrom.GetRetryPolicy().GetMaximumAttempts() | ||
| } |
There was a problem hiding this comment.
Not blocking this PR but looking at this feature and the existing UpdateWorkflowOptions feature, it would be easy to add an option and forget to handle it in the merge/update/value-clearing logic. It would be good to have tests that uses protobuf reflection to verify that all possible paths are either handled by the update APIs or intentionally not offered for update/clearing.
There was a problem hiding this comment.
Added a test for this, great idea!
0cbb52d
into
feature/activity-operator-cmds
Implements the `UpdateActivityExecutionOptions` RPC for standalone activities. The workflow-activity path is already supported via the existing UpdateActivityOptions history service API this PR only wires up the standalone path end-to-end. Core changes: - `chasm/lib/activity/activity.go` — Adds UpdateActivityExecutionOptions method on Activity: - Validates mutual exclusion of `update_mask` / `restore_original` - `restore_original`: bulk-assigns all fields from the `original_options` snapshot stored at schedule time - Field-mask path: applies only the specified fields via `mergeActivityOptions` - Stamp invalidation: bumps `attempt.Stamp` to cancel in-flight `ActivityDispatchTask` and timeout tasks, then re-queues a new dispatch and schedule-to-start timeout - Retry interval recalculation: after a retry-policy update, recomputes `attempt.CurrentRetryInterval` so the re-dispatch fires at the new (possibly shorter) interval rather than the stale one - Schedule-to-close update: adds a new `ScheduleToCloseTimeoutTask` at the updated deadline so a shortened timeout fires immediately - Adds `original_options` to `ActivityState` proto and populates it in `NewStandaloneActivity` - chasm/lib/activity/handler.go — Routes the standalone path to `chasm.UpdateComponent` to (*Activity).UpdateActivityExecutionOptions - chasm/lib/activity/frontend.go — Adds input validation (`validateUpdateActivityExecutionOptionsRequest`: activity ID required/length, identity length, run ID UUID), and gates the standalone path behind the Enabled config flag while always permitting the workflow path UpdateActivityExecutionOptions is the new unified RPC for updating activity options for both standalone activities and workflow-embedded activities. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is into a feature branch
Implements the `UpdateActivityExecutionOptions` RPC for standalone activities. The workflow-activity path is already supported via the existing UpdateActivityOptions history service API this PR only wires up the standalone path end-to-end. Core changes: - `chasm/lib/activity/activity.go` — Adds UpdateActivityExecutionOptions method on Activity: - Validates mutual exclusion of `update_mask` / `restore_original` - `restore_original`: bulk-assigns all fields from the `original_options` snapshot stored at schedule time - Field-mask path: applies only the specified fields via `mergeActivityOptions` - Stamp invalidation: bumps `attempt.Stamp` to cancel in-flight `ActivityDispatchTask` and timeout tasks, then re-queues a new dispatch and schedule-to-start timeout - Retry interval recalculation: after a retry-policy update, recomputes `attempt.CurrentRetryInterval` so the re-dispatch fires at the new (possibly shorter) interval rather than the stale one - Schedule-to-close update: adds a new `ScheduleToCloseTimeoutTask` at the updated deadline so a shortened timeout fires immediately - Adds `original_options` to `ActivityState` proto and populates it in `NewStandaloneActivity` - chasm/lib/activity/handler.go — Routes the standalone path to `chasm.UpdateComponent` to (*Activity).UpdateActivityExecutionOptions - chasm/lib/activity/frontend.go — Adds input validation (`validateUpdateActivityExecutionOptionsRequest`: activity ID required/length, identity length, run ID UUID), and gates the standalone path behind the Enabled config flag while always permitting the workflow path UpdateActivityExecutionOptions is the new unified RPC for updating activity options for both standalone activities and workflow-embedded activities. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is into a feature branch
Implements the `UpdateActivityExecutionOptions` RPC for standalone activities. The workflow-activity path is already supported via the existing UpdateActivityOptions history service API this PR only wires up the standalone path end-to-end. Core changes: - `chasm/lib/activity/activity.go` — Adds UpdateActivityExecutionOptions method on Activity: - Validates mutual exclusion of `update_mask` / `restore_original` - `restore_original`: bulk-assigns all fields from the `original_options` snapshot stored at schedule time - Field-mask path: applies only the specified fields via `mergeActivityOptions` - Stamp invalidation: bumps `attempt.Stamp` to cancel in-flight `ActivityDispatchTask` and timeout tasks, then re-queues a new dispatch and schedule-to-start timeout - Retry interval recalculation: after a retry-policy update, recomputes `attempt.CurrentRetryInterval` so the re-dispatch fires at the new (possibly shorter) interval rather than the stale one - Schedule-to-close update: adds a new `ScheduleToCloseTimeoutTask` at the updated deadline so a shortened timeout fires immediately - Adds `original_options` to `ActivityState` proto and populates it in `NewStandaloneActivity` - chasm/lib/activity/handler.go — Routes the standalone path to `chasm.UpdateComponent` to (*Activity).UpdateActivityExecutionOptions - chasm/lib/activity/frontend.go — Adds input validation (`validateUpdateActivityExecutionOptionsRequest`: activity ID required/length, identity length, run ID UUID), and gates the standalone path behind the Enabled config flag while always permitting the workflow path UpdateActivityExecutionOptions is the new unified RPC for updating activity options for both standalone activities and workflow-embedded activities. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is into a feature branch
Implements the `UpdateActivityExecutionOptions` RPC for standalone activities. The workflow-activity path is already supported via the existing UpdateActivityOptions history service API this PR only wires up the standalone path end-to-end. Core changes: - `chasm/lib/activity/activity.go` — Adds UpdateActivityExecutionOptions method on Activity: - Validates mutual exclusion of `update_mask` / `restore_original` - `restore_original`: bulk-assigns all fields from the `original_options` snapshot stored at schedule time - Field-mask path: applies only the specified fields via `mergeActivityOptions` - Stamp invalidation: bumps `attempt.Stamp` to cancel in-flight `ActivityDispatchTask` and timeout tasks, then re-queues a new dispatch and schedule-to-start timeout - Retry interval recalculation: after a retry-policy update, recomputes `attempt.CurrentRetryInterval` so the re-dispatch fires at the new (possibly shorter) interval rather than the stale one - Schedule-to-close update: adds a new `ScheduleToCloseTimeoutTask` at the updated deadline so a shortened timeout fires immediately - Adds `original_options` to `ActivityState` proto and populates it in `NewStandaloneActivity` - chasm/lib/activity/handler.go — Routes the standalone path to `chasm.UpdateComponent` to (*Activity).UpdateActivityExecutionOptions - chasm/lib/activity/frontend.go — Adds input validation (`validateUpdateActivityExecutionOptionsRequest`: activity ID required/length, identity length, run ID UUID), and gates the standalone path behind the Enabled config flag while always permitting the workflow path UpdateActivityExecutionOptions is the new unified RPC for updating activity options for both standalone activities and workflow-embedded activities. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is into a feature branch
Implements the `UpdateActivityExecutionOptions` RPC for standalone activities. The workflow-activity path is already supported via the existing UpdateActivityOptions history service API this PR only wires up the standalone path end-to-end. Core changes: - `chasm/lib/activity/activity.go` — Adds UpdateActivityExecutionOptions method on Activity: - Validates mutual exclusion of `update_mask` / `restore_original` - `restore_original`: bulk-assigns all fields from the `original_options` snapshot stored at schedule time - Field-mask path: applies only the specified fields via `mergeActivityOptions` - Stamp invalidation: bumps `attempt.Stamp` to cancel in-flight `ActivityDispatchTask` and timeout tasks, then re-queues a new dispatch and schedule-to-start timeout - Retry interval recalculation: after a retry-policy update, recomputes `attempt.CurrentRetryInterval` so the re-dispatch fires at the new (possibly shorter) interval rather than the stale one - Schedule-to-close update: adds a new `ScheduleToCloseTimeoutTask` at the updated deadline so a shortened timeout fires immediately - Adds `original_options` to `ActivityState` proto and populates it in `NewStandaloneActivity` - chasm/lib/activity/handler.go — Routes the standalone path to `chasm.UpdateComponent` to (*Activity).UpdateActivityExecutionOptions - chasm/lib/activity/frontend.go — Adds input validation (`validateUpdateActivityExecutionOptionsRequest`: activity ID required/length, identity length, run ID UUID), and gates the standalone path behind the Enabled config flag while always permitting the workflow path UpdateActivityExecutionOptions is the new unified RPC for updating activity options for both standalone activities and workflow-embedded activities. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is into a feature branch
Implements the `UpdateActivityExecutionOptions` RPC for standalone activities. The workflow-activity path is already supported via the existing UpdateActivityOptions history service API this PR only wires up the standalone path end-to-end. Core changes: - `chasm/lib/activity/activity.go` — Adds UpdateActivityExecutionOptions method on Activity: - Validates mutual exclusion of `update_mask` / `restore_original` - `restore_original`: bulk-assigns all fields from the `original_options` snapshot stored at schedule time - Field-mask path: applies only the specified fields via `mergeActivityOptions` - Stamp invalidation: bumps `attempt.Stamp` to cancel in-flight `ActivityDispatchTask` and timeout tasks, then re-queues a new dispatch and schedule-to-start timeout - Retry interval recalculation: after a retry-policy update, recomputes `attempt.CurrentRetryInterval` so the re-dispatch fires at the new (possibly shorter) interval rather than the stale one - Schedule-to-close update: adds a new `ScheduleToCloseTimeoutTask` at the updated deadline so a shortened timeout fires immediately - Adds `original_options` to `ActivityState` proto and populates it in `NewStandaloneActivity` - chasm/lib/activity/handler.go — Routes the standalone path to `chasm.UpdateComponent` to (*Activity).UpdateActivityExecutionOptions - chasm/lib/activity/frontend.go — Adds input validation (`validateUpdateActivityExecutionOptionsRequest`: activity ID required/length, identity length, run ID UUID), and gates the standalone path behind the Enabled config flag while always permitting the workflow path UpdateActivityExecutionOptions is the new unified RPC for updating activity options for both standalone activities and workflow-embedded activities. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is into a feature branch
Implements the `UpdateActivityExecutionOptions` RPC for standalone activities. The workflow-activity path is already supported via the existing UpdateActivityOptions history service API this PR only wires up the standalone path end-to-end. Core changes: - `chasm/lib/activity/activity.go` — Adds UpdateActivityExecutionOptions method on Activity: - Validates mutual exclusion of `update_mask` / `restore_original` - `restore_original`: bulk-assigns all fields from the `original_options` snapshot stored at schedule time - Field-mask path: applies only the specified fields via `mergeActivityOptions` - Stamp invalidation: bumps `attempt.Stamp` to cancel in-flight `ActivityDispatchTask` and timeout tasks, then re-queues a new dispatch and schedule-to-start timeout - Retry interval recalculation: after a retry-policy update, recomputes `attempt.CurrentRetryInterval` so the re-dispatch fires at the new (possibly shorter) interval rather than the stale one - Schedule-to-close update: adds a new `ScheduleToCloseTimeoutTask` at the updated deadline so a shortened timeout fires immediately - Adds `original_options` to `ActivityState` proto and populates it in `NewStandaloneActivity` - chasm/lib/activity/handler.go — Routes the standalone path to `chasm.UpdateComponent` to (*Activity).UpdateActivityExecutionOptions - chasm/lib/activity/frontend.go — Adds input validation (`validateUpdateActivityExecutionOptionsRequest`: activity ID required/length, identity length, run ID UUID), and gates the standalone path behind the Enabled config flag while always permitting the workflow path UpdateActivityExecutionOptions is the new unified RPC for updating activity options for both standalone activities and workflow-embedded activities. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is into a feature branch
What changed?
Implements the
UpdateActivityExecutionOptionsRPC for standalone activities. The workflow-activity path is already supported via the existing UpdateActivityOptions history service API this PR only wires up the standalone path end-to-end.Core changes:
chasm/lib/activity/activity.go— Adds UpdateActivityExecutionOptions method on Activity:update_mask/restore_originalrestore_original: bulk-assigns all fields from theoriginal_optionssnapshot stored at schedule timemergeActivityOptionsattempt.Stampto cancel in-flightActivityDispatchTaskand timeout tasks, then re-queues a new dispatch and schedule-to-start timeoutattempt.CurrentRetryIntervalso the re-dispatch fires at the new (possibly shorter) interval rather than the stale oneScheduleToCloseTimeoutTaskat the updated deadline so a shortened timeout fires immediatelyoriginal_optionstoActivityStateproto and populates it inNewStandaloneActivitychasm.UpdateComponentto (*Activity).UpdateActivityExecutionOptionsvalidateUpdateActivityExecutionOptionsRequest: activity ID required/length, identity length, run ID UUID), and gates the standalone path behind the Enabled config flag while always permitting the workflow pathWhy?
UpdateActivityExecutionOptions is the new unified RPC for updating activity options for both standalone activities and workflow-embedded activities.
How did you test it?
Potential risks
Minimal, this is into a feature branch