Skip to content

Commit

Permalink
Partial revert #3731, fix errcheck, add comments (#3787)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored Jan 20, 2023
1 parent e0081fd commit 82c958d
Showing 1 changed file with 38 additions and 62 deletions.
100 changes: 38 additions & 62 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,34 +188,22 @@ func (s *scheduler) run() error {
s.logger.Warn("Time went backwards", "from", t1, "to", t2)
t2 = t1
}
nextSleep, err := s.processTimeRange(
nextSleep := s.processTimeRange(
t1, t2,
// resolve this to the schedule's policy as late as possible
enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED,
false,
)
if err != nil {
return err
}
s.State.LastProcessedTime = timestamp.TimePtr(t2)
// handle signals after processing time range that just elapsed
scheduleChanged := s.processSignals()
if scheduleChanged {
// need to calculate sleep again
nextSleep, err = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false)
if err != nil {
return err
}
nextSleep = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false)
}
// try starting workflows in the buffer
for {
b, err := s.processBuffer()
if err != nil {
return err
}
if !b {
break
}
//nolint:revive
for s.processBuffer() {
}
s.updateMemoAndSearchAttributes()
// sleep returns on any of:
Expand Down Expand Up @@ -327,11 +315,11 @@ func (s *scheduler) processTimeRange(
t1, t2 time.Time,
overlapPolicy enumspb.ScheduleOverlapPolicy,
manual bool,
) (time.Duration, error) {
) time.Duration {
s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlapPolicy", overlapPolicy, "manual", manual)

if s.cspec == nil {
return invalidDuration, nil
return invalidDuration
}

catchupWindow := s.getCatchupWindow()
Expand All @@ -340,16 +328,14 @@ func (s *scheduler) processTimeRange(
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
var next getNextTimeResult
if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
return s.cspec.getNextTime(t1)
}).Get(&next); err != nil {
return 0, err
}
}).Get(&next))
t1 = next.Next
if t1.IsZero() {
return invalidDuration, nil
return invalidDuration
} else if t1.After(t2) {
return t1.Sub(t2), nil
return t1.Sub(t2)
}
if !manual && t2.Sub(t1) > catchupWindow {
s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1)
Expand Down Expand Up @@ -690,7 +676,8 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en
}

// processBuffer should return true if there might be more work to do right now.
func (s *scheduler) processBuffer() (bool, error) {
//nolint:revive
func (s *scheduler) processBuffer() bool {
s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "needRefresh", s.State.NeedRefresh)

// TODO: consider doing this always and removing needRefresh? we only end up here without
Expand All @@ -707,7 +694,7 @@ func (s *scheduler) processBuffer() (bool, error) {
req := s.Schedule.Action.GetStartWorkflow()
if req == nil || len(s.State.BufferedStarts) == 0 {
s.State.BufferedStarts = nil
return false, nil
return false
}

isRunning := len(s.Info.RunningWorkflows) > 0
Expand Down Expand Up @@ -744,15 +731,11 @@ func (s *scheduler) processBuffer() (bool, error) {
// Terminate or cancel if required (terminate overrides cancel if both are present)
if action.needTerminate {
for _, ex := range s.Info.RunningWorkflows {
if err := s.terminateWorkflow(ex); err != nil {
return false, err
}
s.terminateWorkflow(ex)
}
} else if action.needCancel {
for _, ex := range s.Info.RunningWorkflows {
if err := s.cancelWorkflow(ex); err != nil {
return false, err
}
s.cancelWorkflow(ex)
}
}

Expand All @@ -768,7 +751,7 @@ func (s *scheduler) processBuffer() (bool, error) {
}
}

return tryAgain, nil
return tryAgain
}

func (s *scheduler) recordAction(result *schedpb.ScheduleActionResult) {
Expand Down Expand Up @@ -807,10 +790,6 @@ func (s *scheduler) startWorkflow(
}
ctx := workflow.WithLocalActivityOptions(s.ctx, options)

requestID, err := s.newUUIDString()
if err != nil {
return nil, err
}
req := &schedspb.StartWorkflowRequest{
Request: &workflowservice.StartWorkflowExecutionRequest{
WorkflowId: workflowID,
Expand All @@ -821,7 +800,7 @@ func (s *scheduler) startWorkflow(
WorkflowRunTimeout: newWorkflow.WorkflowRunTimeout,
WorkflowTaskTimeout: newWorkflow.WorkflowTaskTimeout,
Identity: s.identity(),
RequestId: requestID,
RequestId: s.newUUIDString(),
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
RetryPolicy: newWorkflow.RetryPolicy,
Memo: newWorkflow.Memo,
Expand All @@ -832,7 +811,7 @@ func (s *scheduler) startWorkflow(
ContinuedFailure: s.State.ContinuedFailure,
}
var res schedspb.StartWorkflowResponse
err = workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res)
err := workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -908,60 +887,57 @@ func (s *scheduler) startLongPollWatcher(ex *commonpb.WorkflowExecution) {
s.watchingWorkflowId = ex.WorkflowId
}

func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) error {
func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) {
ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions)
requestID, err := s.newUUIDString()
if err != nil {
return err
}
areq := &schedspb.CancelWorkflowRequest{
RequestId: requestID,
RequestId: s.newUUIDString(),
Identity: s.identity(),
Execution: ex,
Reason: "cancelled by schedule overlap policy",
}
err = workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
err := workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("cancel workflow failed", "workflow", ex.WorkflowId, "error", err)
return err
}
// Note: the local activity has completed (or failed) here but the workflow might take time
// to close since a cancel is only a request.
return nil
// If this failed, that's okay, we'll try it again the next time we try to take an action.
}

func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) error {
func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) {
ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions)
requestID, err := s.newUUIDString()
if err != nil {
return err
}
areq := &schedspb.TerminateWorkflowRequest{
RequestId: requestID,
RequestId: s.newUUIDString(),
Identity: s.identity(),
Execution: ex,
Reason: "terminated by schedule overlap policy",
}
err = workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
err := workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("terminate workflow failed", "workflow", ex.WorkflowId, "error", err)
}
return err
// Note: the local activity has completed (or failed) here but we'll still wait until we
// observe the workflow close (with a watcher) to start the next one.
// If this failed, that's okay, we'll try it again the next time we try to take an action.
}

func (s *scheduler) newUUIDString() (string, error) {
func (s *scheduler) newUUIDString() string {
if len(s.uuidBatch) == 0 {
if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
out := make([]string, 10)
for i := range out {
out[i] = uuid.NewString()
}
return out
}).Get(&s.uuidBatch); err != nil {
return "", err
}
}).Get(&s.uuidBatch))
}
next := s.uuidBatch[0]
s.uuidBatch = s.uuidBatch[1:]
return next, nil
return next
}

func panicIfErr(err error) {
if err != nil {
panic(err)
}
}

0 comments on commit 82c958d

Please sign in to comment.