Skip to content

Commit

Permalink
Use MutableSideEffect for scheduler workflow tweakables (#2906)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed Jun 1, 2022
1 parent 78beb81 commit 775d1fb
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 44 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
go.opentelemetry.io/otel/sdk v1.7.0
go.opentelemetry.io/otel/sdk/metric v0.30.0
go.temporal.io/api v1.7.1-0.20220531210845-69c8b41408d0
go.temporal.io/sdk v1.14.1-0.20220513214234-7ccfda678fed
go.temporal.io/sdk v1.14.1-0.20220525140819-54f4148173a9
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.9.0
go.uber.org/fx v1.17.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,8 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
go.temporal.io/api v1.7.1-0.20220510183009-449d18444c9a/go.mod h1:YU5EQaONkIr0ZRju0NqdqYNH/hCkBuwqRMDA0iaj7JM=
go.temporal.io/api v1.7.1-0.20220531210845-69c8b41408d0 h1:+cFOs2QyDq4boo15vTUBlDQOBf3HTOEA5n3bMiYHBaU=
go.temporal.io/api v1.7.1-0.20220531210845-69c8b41408d0/go.mod h1:JMB2socWBdrPOwarYntcurp/HgKWcpkA4mQKRD6D9Y4=
go.temporal.io/sdk v1.14.1-0.20220513214234-7ccfda678fed h1:QrZQUSnsOsTZFh4bq5wyOMvl2uZbJJ80JiW/6Gs3eUY=
go.temporal.io/sdk v1.14.1-0.20220513214234-7ccfda678fed/go.mod h1:d1S1ETFShrybdBYxhqizQASREk0G9oi4RE2VKsYGHAk=
go.temporal.io/sdk v1.14.1-0.20220525140819-54f4148173a9 h1:6XjIRR49O5FAXF6f64EP1N5VfCs8/IPIqXfXWQcRCp8=
go.temporal.io/sdk v1.14.1-0.20220525140819-54f4148173a9/go.mod h1:d1S1ETFShrybdBYxhqizQASREk0G9oi4RE2VKsYGHAk=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
11 changes: 3 additions & 8 deletions service/worker/scheduler/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ import (
"go.temporal.io/server/common/primitives/timestamp"
)

const (
defaultJitter = 1 * time.Second
)

type (
compiledSpec struct {
spec *schedpb.ScheduleSpec
Expand Down Expand Up @@ -166,12 +162,11 @@ func (cs *compiledSpec) excluded(nominal time.Time) bool {
}

// Adds jitter to a nominal time, deterministically (by hashing the given time). The range
// of jitter is the min of the schedule spec's jitter (default 1s if missing) and the
// given limit value.
// of the jitter is zero to the min of the schedule spec's jitter and the given limit value.
func (cs *compiledSpec) addJitter(nominal time.Time, limit time.Duration) time.Time {
maxJitter := timestamp.DurationValue(cs.spec.Jitter)
if maxJitter == 0 {
maxJitter = defaultJitter
if maxJitter < 0 {
maxJitter = 0
}
if maxJitter > limit {
maxJitter = limit
Expand Down
3 changes: 3 additions & 0 deletions service/worker/scheduler/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (s *specSuite) TestSpecExclude() {
Second: "*",
},
},
Jitter: timestamp.DurationPtr(1 * time.Second),
},
time.Date(2022, 3, 23, 8, 00, 0, 0, time.UTC),
time.Date(2022, 3, 23, 9, 00, 0, 235000000, time.UTC),
Expand All @@ -214,6 +215,7 @@ func (s *specSuite) TestSpecStartTime() {
{Interval: timestamp.DurationPtr(90 * time.Minute)},
},
StartTime: timestamp.TimePtr(time.Date(2022, 3, 23, 12, 0, 0, 0, time.UTC)),
Jitter: timestamp.DurationPtr(1 * time.Second),
},
time.Date(2022, 3, 23, 8, 00, 0, 0, time.UTC),
time.Date(2022, 3, 23, 12, 00, 0, 162000000, time.UTC),
Expand All @@ -229,6 +231,7 @@ func (s *specSuite) TestSpecEndTime() {
{Interval: timestamp.DurationPtr(90 * time.Minute)},
},
EndTime: timestamp.TimePtr(time.Date(2022, 3, 23, 14, 0, 0, 0, time.UTC)),
Jitter: timestamp.DurationPtr(1 * time.Second),
},
time.Date(2022, 3, 23, 11, 00, 0, 0, time.UTC),
time.Date(2022, 3, 23, 12, 00, 0, 162000000, time.UTC),
Expand Down
83 changes: 50 additions & 33 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,10 @@ const (

InitialConflictToken = 1

// The number of future action times to include in Describe.
futureActionCount = 10
// The number of recent actual action results to include in Describe.
recentActionCount = 10

// Maximum number of times to list per ListMatchingTimes query.
// Maximum number of times to list per ListMatchingTimes query. (This is used only in a
// query so it can be changed without breaking history.)
maxListMatchingTimesCount = 1000

// TODO: replace with event count or hint from server
iterationsBeforeContinueAsNew = 500

searchAttrStartTime = "TemporalScheduledStartTime"
searchAttrScheduleById = "TemporalScheduledById"
)
Expand All @@ -86,6 +79,8 @@ type (

cspec *compiledSpec

tweakables tweakablePolicies

// We might have zero or one long-poll watcher activity running. If so, these are set:
watchingWorkflowId string
watchingFuture workflow.Future
Expand All @@ -94,8 +89,14 @@ type (
needRefresh bool
}

catchupWindowParams struct {
Default, Min time.Duration
tweakablePolicies struct {
DefaultCatchupWindow time.Duration // Default for catchup window
MinCatchupWindow time.Duration // Minimum for catchup window
CanceledTerminatedCountAsFailures bool // Whether cancelled+terminated count for pause-on-failure
AlwaysAppendTimestamp bool // Whether to append timestamp for non-overlapping workflows too
FutureActionCount int // The number of future action times to include in Describe.
RecentActionCount int // The number of recent actual action results to include in Describe.
IterationsBeforeContinueAsNew int
}
)

Expand All @@ -112,9 +113,17 @@ var (
},
}

defaultCatchupWindowParams = catchupWindowParams{
Default: 60 * time.Second,
Min: 10 * time.Second,
// We put a handful of options in a static value and use it as a MutableSideEffect within
// the workflow so that we can change them without breaking existing executions or having
// to use versioning.
currentTweakablePolicies = tweakablePolicies{
DefaultCatchupWindow: 60 * time.Second,
MinCatchupWindow: 10 * time.Second,
CanceledTerminatedCountAsFailures: false,
AlwaysAppendTimestamp: true,
FutureActionCount: 10,
RecentActionCount: 10,
IterationsBeforeContinueAsNew: 500,
}

errUpdateConflict = errors.New("conflicting concurrent update")
Expand All @@ -134,6 +143,7 @@ func (s *scheduler) run() error {
s.logger.Info("Schedule starting", "schedule", s.Schedule)

s.ensureFields()
s.updateTweakables()
s.compileSpec()

if err := workflow.SetQueryHandler(s.ctx, QueryNameDescribe, s.handleDescribeQuery); err != nil {
Expand All @@ -156,7 +166,7 @@ func (s *scheduler) run() error {
s.processPatch(s.InitialPatch)
s.InitialPatch = nil

for iters := iterationsBeforeContinueAsNew; iters >= 0; iters-- {
for iters := s.tweakables.IterationsBeforeContinueAsNew; iters >= 0; iters-- {
t1 := timestamp.TimeValue(s.State.LastProcessedTime)
t2 := s.now()
if t2.Before(t1) {
Expand All @@ -178,6 +188,7 @@ func (s *scheduler) run() error {
// 2. we got a signal (update, request, refresh)
// 3. a workflow that we were watching finished
s.sleep(nextSleep, hasNext)
s.updateTweakables()
}

// Any watcher activities will get cancelled automatically if running.
Expand Down Expand Up @@ -395,7 +406,10 @@ func (s *scheduler) processWatcherResult(id string, f workflow.Future) {
}

// handle pause-on-failure
failedStatus := res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_FAILED || res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT
failedStatus := res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_FAILED ||
res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT ||
(s.tweakables.CanceledTerminatedCountAsFailures &&
(res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED || res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED))
pauseOnFailure := s.Schedule.Policies.PauseOnFailure && failedStatus && !s.Schedule.State.Paused
if pauseOnFailure {
s.Schedule.State.Paused = true
Expand Down Expand Up @@ -462,9 +476,9 @@ func (s *scheduler) handleRefreshSignal(ch workflow.ReceiveChannel, _ bool) {
func (s *scheduler) handleDescribeQuery() (*schedspb.DescribeResponse, error) {
// update future actions
if s.cspec != nil {
s.Info.FutureActionTimes = make([]*time.Time, 0, futureActionCount)
s.Info.FutureActionTimes = make([]*time.Time, 0, s.tweakables.FutureActionCount)
t1 := timestamp.TimeValue(s.State.LastProcessedTime)
for len(s.Info.FutureActionTimes) < futureActionCount {
for len(s.Info.FutureActionTimes) < s.tweakables.FutureActionCount {
var has bool
_, t1, has = s.cspec.getNextTime(t1)
if !has {
Expand Down Expand Up @@ -514,21 +528,21 @@ func (s *scheduler) checkConflict(token int64) error {
return errUpdateConflict
}

func (s *scheduler) updateTweakables() {
// Use MutableSideEffect so that we can change the defaults without breaking determinism.
get := func(ctx workflow.Context) interface{} { return currentTweakablePolicies }
eq := func(a, b interface{}) bool { return a.(tweakablePolicies) == b.(tweakablePolicies) }
if err := workflow.MutableSideEffect(s.ctx, "tweakables", get, eq).Get(&s.tweakables); err != nil {
panic("can't decode tweakablePolicies:" + err.Error())
}
}

func (s *scheduler) getCatchupWindow() time.Duration {
// TODO: re-enable this after it works?
// // Use MutableSideEffect so that we can change the defaults without breaking determinism.
// get := func(ctx workflow.Context) interface{} { return defaultCatchupWindowParams }
// eq := func(a, b interface{}) bool { return a.(catchupWindowParams) == b.(catchupWindowParams) }
// var params catchupWindowParams
// if err := workflow.MutableSideEffect(s.ctx, "defaultCatchupWindowParams", get, eq).Get(&params); err != nil {
// panic("can't decode catchupWindowParams:" + err.Error())
// }
params := defaultCatchupWindowParams
cw := s.Schedule.Policies.CatchupWindow
if cw == nil {
return params.Default
} else if *cw < params.Min {
return params.Min
return s.tweakables.DefaultCatchupWindow
} else if *cw < s.tweakables.MinCatchupWindow {
return s.tweakables.MinCatchupWindow
} else {
return *cw
}
Expand Down Expand Up @@ -636,7 +650,7 @@ func (s *scheduler) processBuffer() bool {
func (s *scheduler) recordAction(result *schedpb.ScheduleActionResult) {
s.Info.ActionCount++
s.Info.RecentActions = append(s.Info.RecentActions, result)
extra := len(s.Info.RecentActions) - 10
extra := len(s.Info.RecentActions) - s.tweakables.RecentActionCount
if extra > 0 {
s.Info.RecentActions = s.Info.RecentActions[extra:]
}
Expand All @@ -649,9 +663,12 @@ func (s *scheduler) startWorkflow(
start *schedspb.BufferedStart,
newWorkflow *workflowpb.NewWorkflowExecutionInfo,
) (*schedpb.ScheduleActionResult, error) {
// must match AppendedTimestampForValidation
nominalTimeSec := start.NominalTime.UTC().Truncate(time.Second)
workflowID := newWorkflow.WorkflowId + "-" + nominalTimeSec.Format(time.RFC3339)
workflowID := newWorkflow.WorkflowId
if start.OverlapPolicy == enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL || s.tweakables.AlwaysAppendTimestamp {
// must match AppendedTimestampForValidation
workflowID += "-" + nominalTimeSec.Format(time.RFC3339)
}

// Set scheduleToCloseTimeout based on catchup window, which is the latest time that it's
// acceptable to start this workflow. For manual starts (trigger immediately or backfill),
Expand Down

0 comments on commit 775d1fb

Please sign in to comment.