Skip to content

Commit

Permalink
Process signals later in scheduler workflow (#3397)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed Sep 16, 2022
1 parent de91ae4 commit 86966c5
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 23 deletions.
57 changes: 37 additions & 20 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ type (
watchingWorkflowId string
watchingFuture workflow.Future

// Signal requests
pendingPatch *schedpb.SchedulePatch
pendingUpdate *schedspb.FullUpdateRequest

uuidBatch []string
}

Expand Down Expand Up @@ -170,11 +174,12 @@ func (s *scheduler) run() error {
s.Info.CreateTime = s.State.LastProcessedTime
}

// A schedule may be created with an initial Patch, e.g. start one immediately. Handle that now.
s.processPatch(s.InitialPatch)
// A schedule may be created with an initial Patch, e.g. start one immediately. Put that in
// the state so it takes effect below.
s.pendingPatch = s.InitialPatch
s.InitialPatch = nil

for iters := s.tweakables.IterationsBeforeContinueAsNew; iters > 0; iters-- {
for iters := s.tweakables.IterationsBeforeContinueAsNew; iters > 0 || s.pendingUpdate != nil || s.pendingPatch != nil; iters-- {
t1 := timestamp.TimeValue(s.State.LastProcessedTime)
t2 := s.now()
if t2.Before(t1) {
Expand All @@ -189,6 +194,13 @@ func (s *scheduler) run() error {
false,
)
s.State.LastProcessedTime = timestamp.TimePtr(t2)
// handle signals after processing time range that just elapsed
scheduleChanged := s.processSignals()
if scheduleChanged {
// need to calculate sleep again
nextSleep = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false)
}
// try starting workflows in the buffer
for s.processBuffer() {
}
s.updateMemoAndSearchAttributes()
Expand Down Expand Up @@ -266,11 +278,7 @@ func (s *scheduler) now() time.Time {
}

func (s *scheduler) processPatch(patch *schedpb.SchedulePatch) {
s.logger.Debug("processPatch", "patch", patch)

if patch == nil {
return
}
s.logger.Info("Schedule patch", "patch", patch.String())

if trigger := patch.TriggerImmediately; trigger != nil {
now := s.now()
Expand Down Expand Up @@ -367,10 +375,14 @@ func (s *scheduler) sleep(nextSleep time.Duration) {
sel := workflow.NewSelector(s.ctx)

upCh := workflow.GetSignalChannel(s.ctx, SignalNameUpdate)
sel.AddReceive(upCh, s.handleUpdateSignal)
sel.AddReceive(upCh, func(ch workflow.ReceiveChannel, _ bool) {
ch.Receive(s.ctx, &s.pendingUpdate)
})

reqCh := workflow.GetSignalChannel(s.ctx, SignalNamePatch)
sel.AddReceive(reqCh, s.handlePatchSignal)
sel.AddReceive(reqCh, func(ch workflow.ReceiveChannel, _ bool) {
ch.Receive(s.ctx, &s.pendingPatch)
})

refreshCh := workflow.GetSignalChannel(s.ctx, SignalNameRefresh)
sel.AddReceive(refreshCh, s.handleRefreshSignal)
Expand Down Expand Up @@ -450,9 +462,7 @@ func (s *scheduler) processWatcherResult(id string, f workflow.Future) {
s.logger.Info("started workflow finished", "workflow", id, "status", res.Status, "pause-after-failure", pauseOnFailure)
}

func (s *scheduler) handleUpdateSignal(ch workflow.ReceiveChannel, _ bool) {
var req schedspb.FullUpdateRequest
ch.Receive(s.ctx, &req)
func (s *scheduler) processUpdate(req *schedspb.FullUpdateRequest) {
if err := s.checkConflict(req.ConflictToken); err != nil {
s.logger.Warn("Update conflicted with concurrent change")
return
Expand All @@ -473,13 +483,6 @@ func (s *scheduler) handleUpdateSignal(ch workflow.ReceiveChannel, _ bool) {
s.incSeqNo()
}

func (s *scheduler) handlePatchSignal(ch workflow.ReceiveChannel, _ bool) {
var patch schedpb.SchedulePatch
ch.Receive(s.ctx, &patch)
s.logger.Info("Schedule patch", "patch", patch.String())
s.processPatch(&patch)
}

func (s *scheduler) handleRefreshSignal(ch workflow.ReceiveChannel, _ bool) {
ch.Receive(s.ctx, nil)
s.logger.Debug("got refresh signal")
Expand All @@ -488,6 +491,20 @@ func (s *scheduler) handleRefreshSignal(ch workflow.ReceiveChannel, _ bool) {
s.State.NeedRefresh = true
}

func (s *scheduler) processSignals() bool {
scheduleChanged := false
if s.pendingPatch != nil {
s.processPatch(s.pendingPatch)
s.pendingPatch = nil
}
if s.pendingUpdate != nil {
s.processUpdate(s.pendingUpdate)
s.pendingUpdate = nil
scheduleChanged = true
}
return scheduleChanged
}

func (s *scheduler) getFutureActionTimes(n int) []*time.Time {
if s.cspec == nil {
return nil
Expand Down
56 changes: 53 additions & 3 deletions service/worker/scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (s *workflowSuite) runAcrossContinue(
s.True(s.env.IsWorkflowCompleted())
result := s.env.GetWorkflowError()
var canErr *workflow.ContinueAsNewError
s.True(errors.As(result, &canErr))
s.Require().True(errors.As(result, &canErr))

s.env.AssertExpectations(s.T())

Expand All @@ -299,9 +299,9 @@ func (s *workflowSuite) runAcrossContinue(
s.NoError(payloads.Decode(canErr.Input, &startArgs))
}
// check starts that we actually got
s.Equal(len(runs), len(gotRuns))
s.Require().Equal(len(runs), len(gotRuns))
for _, run := range runs {
s.True(run.start.Equal(gotRuns[run.id]))
s.Truef(run.start.Equal(gotRuns[run.id]), "%v != %v", run.start, gotRuns[run.id])
}
}
}
Expand Down Expand Up @@ -1246,6 +1246,56 @@ func (s *workflowSuite) TestUpdate() {
)
}

func (s *workflowSuite) TestUpdateNotRetroactive() {
s.runAcrossContinue(
[]workflowRun{
{
id: "myid-2022-06-01T01:00:00Z",
start: time.Date(2022, 6, 1, 1, 0, 0, 0, time.UTC),
end: time.Date(2022, 6, 1, 1, 0, 30, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
{
id: "newid-2022-06-01T01:07:20Z",
start: time.Date(2022, 6, 1, 1, 7, 20, 0, time.UTC),
end: time.Date(2022, 6, 1, 1, 7, 30, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
{
id: "newid-2022-06-01T01:07:40Z",
start: time.Date(2022, 6, 1, 1, 7, 40, 0, time.UTC),
end: time.Date(2022, 6, 1, 1, 7, 50, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
},
[]delayedCallback{
{
at: time.Date(2022, 6, 1, 1, 7, 10, 0, time.UTC),
f: func() {
s.env.SignalWorkflow(SignalNameUpdate, &schedspb.FullUpdateRequest{
Schedule: &schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Interval: []*schedpb.IntervalSpec{{
Interval: timestamp.DurationPtr(20 * time.Second),
}},
},
Action: s.defaultAction("newid"),
},
})
},
},
},
&schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Interval: []*schedpb.IntervalSpec{{
Interval: timestamp.DurationPtr(1 * time.Hour),
}},
},
},
5,
)
}

func (s *workflowSuite) TestLimitedActions() {
s.runAcrossContinue(
[]workflowRun{
Expand Down

0 comments on commit 86966c5

Please sign in to comment.