Skip to content

Commit

Permalink
Fix schedule workflow to CAN after signals (#5799)
Browse files Browse the repository at this point in the history
## What changed and why?
If a schedule was paused and received a large number of signals
(trigger, backfill, etc.), it wouldn't be able to continue-as-new and
history size could grow past the suggested limit.

For various reasons the main loop receives and processes signals across
iterations, with checking for CAN in between, so a wakeup due to a
signal (instead of a timer) would prevent CAN. Refactoring is hard due
to the determinism requirement, so the simplest fix is to do a second
check.

## How did you test it?
New unit test
  • Loading branch information
dnr authored and yycptt committed Apr 26, 2024
1 parent 1bb03b7 commit 99b6e0c
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
9 changes: 9 additions & 0 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ const (
IncrementalBackfill = 5
// update from previous action instead of current time
UpdateFromPrevious = 6
// do continue-as-new after pending signals
CANAfterSignals = 7
)

const (
Expand Down Expand Up @@ -313,6 +315,13 @@ func (s *scheduler) run() error {
if exp := s.getRetentionExpiration(nextWakeup); !exp.IsZero() && !exp.After(s.now()) {
return nil
}
if suggestContinueAsNew && s.pendingUpdate == nil && s.pendingPatch == nil && s.hasMinVersion(CANAfterSignals) {
// If suggestContinueAsNew was true but we had a pending update or patch, we would
// not break above, but process the update/patch. Now that we're done, we should
// break here to do CAN. (pendingUpdate and pendingPatch should always nil here,
// the check check above is just being defensive.)
break
}

// sleep returns on any of:
// 1. requested time elapsed
Expand Down
61 changes: 61 additions & 0 deletions service/worker/scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2049,6 +2049,67 @@ func (s *workflowSuite) TestCANBySuggested() {
s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError()))
}

func (s *workflowSuite) TestCANBySuggestedWithSignals() {
// TODO: remove once default version is CANAfterSignals
prevTweakables := currentTweakablePolicies
currentTweakablePolicies.Version = CANAfterSignals
defer func() { currentTweakablePolicies = prevTweakables }()

// written using low-level mocks so we can control iteration count

runs := []time.Duration{
1 * time.Minute,
2 * time.Minute,
3 * time.Minute,
5 * time.Minute, // suggestCAN will be true for this signal
8 * time.Minute, // this one won't be reached
}
suggestCANAt := 4 * time.Minute
for _, d := range runs {
t := baseStartTime.Add(d)
s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
s.Equal("myid-"+t.Format(time.RFC3339), req.Request.WorkflowId)
return nil, nil
})
if d > suggestCANAt {
// the first one after the CAN flag is flipped will run, further ones will not
break
}
}
s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
s.Fail("too many starts", req.Request.WorkflowId)
return nil, nil
}).Times(0).Maybe()

s.env.RegisterDelayedCallback(func() {
s.env.SetContinueAsNewSuggested(true)
}, suggestCANAt)

for _, d := range runs {
s.env.RegisterDelayedCallback(func() {
s.env.SignalWorkflow(SignalNamePatch, &schedpb.SchedulePatch{
TriggerImmediately: &schedpb.TriggerImmediatelyRequest{},
})
}, d)
}

s.run(&schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Interval: []*schedpb.IntervalSpec{{
Interval: durationpb.New(100 * time.Minute),
}},
},
Policies: &schedpb.SchedulePolicies{
OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
},
State: &schedpb.ScheduleState{
Paused: true,
},
}, 0) // 0 means use suggested
s.True(s.env.IsWorkflowCompleted())
s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError()))
}

func (s *workflowSuite) TestCANBySignal() {
// written using low-level mocks so we can control iteration count

Expand Down

0 comments on commit 99b6e0c

Please sign in to comment.