Skip to content

Commit

Permalink
Use ContinueAsNewSuggested in scheduler workflow (#4990)
Browse files Browse the repository at this point in the history
**What changed?**
Scheduler workflow uses server-sent suggestion for when to
continue-as-new instead of fixed iteration count.

Note this is not enabled in this PR yet.

**Why?**
Automatically handle history size/event count too large conditions (or
any future conditions added by the server), which we might get if we do
more work than expected per iteration.

**How did you test it?**
new unit tests, also replaced for loop with previous version to verify
actual iteration count didn't change

**Potential risks**
The default history size suggestion is at 4MB, which we could hit after
just a few large payload responses, and then we'd do continue-as-new
more often than we might like.
  • Loading branch information
dnr authored and rodrigozhou committed Oct 31, 2023
1 parent 88f8f11 commit b8163ce
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 7 deletions.
19 changes: 12 additions & 7 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ type (
RecentActionCount int // The number of recent actual action results to include in Describe.
FutureActionCountForList int // The number of future action times to include in List (search attr).
RecentActionCountForList int // The number of recent actual action results to include in List (search attr).
IterationsBeforeContinueAsNew int
SleepWhilePaused bool // If true, don't set timers while paused/out of actions
IterationsBeforeContinueAsNew int // Number of iterations per run, or 0 to use server-suggested
SleepWhilePaused bool // If true, don't set timers while paused/out of actions
// MaxBufferSize limits the number of buffered starts. This also limits the number of
// workflows that can be backfilled at once (since they all have to fit in the buffer).
MaxBufferSize int
Expand Down Expand Up @@ -191,7 +191,7 @@ var (
RecentActionCount: 10,
FutureActionCountForList: 5,
RecentActionCountForList: 5,
IterationsBeforeContinueAsNew: 500,
IterationsBeforeContinueAsNew: 500, // TODO: change to 0 to use GetContinueAsNewSuggested
SleepWhilePaused: true,
MaxBufferSize: 1000,
AllowZeroSleep: true,
Expand Down Expand Up @@ -253,12 +253,17 @@ func (s *scheduler) run() error {

iters := s.tweakables.IterationsBeforeContinueAsNew
for {
// TODO: use the real GetContinueAsNewSuggested
continueAsNewSuggested := iters <= 0 || workflow.GetInfo(s.ctx).GetCurrentHistoryLength() >= impossibleHistorySize
if continueAsNewSuggested && s.pendingUpdate == nil && s.pendingPatch == nil {
info := workflow.GetInfo(s.ctx)
suggestContinueAsNew := info.GetCurrentHistoryLength() >= impossibleHistorySize
if s.tweakables.IterationsBeforeContinueAsNew > 0 {
suggestContinueAsNew = suggestContinueAsNew || iters <= 0
iters--
} else {
suggestContinueAsNew = suggestContinueAsNew || info.GetContinueAsNewSuggested()
}
if suggestContinueAsNew && s.pendingUpdate == nil && s.pendingPatch == nil {
break
}
iters--

t1 := timestamp.TimeValue(s.State.LastProcessedTime)
t2 := s.now()
Expand Down
73 changes: 73 additions & 0 deletions service/worker/scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1700,3 +1700,76 @@ func (s *workflowSuite) TestExitScheduleWorkflowWhenEmpty() {
s.False(workflow.IsContinueAsNewError(s.env.GetWorkflowError()))
s.True(s.env.Now().Sub(baseStartTime) == currentTweakablePolicies.RetentionTime)
}

func (s *workflowSuite) TestCANByIterations() {
// written using low-level mocks so we can control iteration count

const iters = 30
// note: one fewer run than iters since the first doesn't start anything
for i := 1; i < iters; i++ {
t := baseStartTime.Add(5 * time.Minute * time.Duration(i))
s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
s.Equal("myid-"+t.Format(time.RFC3339), req.Request.WorkflowId)
return nil, nil
})
}
// this one catches and fails if we go over
s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
s.Fail("too many starts")
return nil, nil
}).Times(0).Maybe()

// this is ignored because we set iters explicitly
s.env.RegisterDelayedCallback(func() {
s.env.SetContinueAsNewSuggested(true)
}, 5*time.Minute*iters/2-time.Second)

s.run(&schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Interval: []*schedpb.IntervalSpec{{
Interval: timestamp.DurationPtr(5 * time.Minute),
}},
},
Policies: &schedpb.SchedulePolicies{
OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
},
}, iters)
s.True(s.env.IsWorkflowCompleted())
s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError()))
}

func (s *workflowSuite) TestCANBySuggested() {
// written using low-level mocks so we can control iteration count

const iters = 30
// note: one fewer run than iters since the first doesn't start anything
for i := 1; i < iters; i++ {
t := baseStartTime.Add(5 * time.Minute * time.Duration(i))
s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
s.Equal("myid-"+t.Format(time.RFC3339), req.Request.WorkflowId)
return nil, nil
})
}
// this one catches and fails if we go over
s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
s.Fail("too many starts", req.Request.WorkflowId)
return nil, nil
}).Times(0).Maybe()

s.env.RegisterDelayedCallback(func() {
s.env.SetContinueAsNewSuggested(true)
}, 5*time.Minute*iters-time.Second)

s.run(&schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Interval: []*schedpb.IntervalSpec{{
Interval: timestamp.DurationPtr(5 * time.Minute),
}},
},
Policies: &schedpb.SchedulePolicies{
OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
},
}, 0) // 0 means use suggested
s.True(s.env.IsWorkflowCompleted())
s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError()))
}

0 comments on commit b8163ce

Please sign in to comment.