From 64517d8326a75d7b9b219088e2e3210681fb672c Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Mon, 19 Dec 2022 16:09:25 -0800 Subject: [PATCH] Fix errcheck in service/worker/ --- service/worker/pernamespaceworker.go | 10 +++- service/worker/scheduler/workflow.go | 88 ++++++++++++++++++++-------- service/worker/worker.go | 4 +- 3 files changed, 73 insertions(+), 29 deletions(-) diff --git a/service/worker/pernamespaceworker.go b/service/worker/pernamespaceworker.go index e018b3fe525f..bfac55c0d71c 100644 --- a/service/worker/pernamespaceworker.go +++ b/service/worker/pernamespaceworker.go @@ -151,7 +151,10 @@ func (wm *perNamespaceWorkerManager) Start( // this will call namespaceCallback with current namespaces wm.namespaceRegistry.RegisterStateChangeCallback(wm, wm.namespaceCallback) - wm.serviceResolver.AddListener(perNamespaceWorkerManagerListenerKey, wm.membershipChangedCh) + err := wm.serviceResolver.AddListener(perNamespaceWorkerManagerListenerKey, wm.membershipChangedCh) + if err != nil { + wm.logger.Fatal("Unable to register membership listener", tag.Error(err)) + } go wm.membershipChangedListener() wm.logger.Info("", tag.LifeCycleStarted) @@ -169,7 +172,10 @@ func (wm *perNamespaceWorkerManager) Stop() { wm.logger.Info("", tag.LifeCycleStopping) wm.namespaceRegistry.UnregisterStateChangeCallback(wm) - wm.serviceResolver.RemoveListener(perNamespaceWorkerManagerListenerKey) + err := wm.serviceResolver.RemoveListener(perNamespaceWorkerManagerListenerKey) + if err != nil { + wm.logger.Fatal("Unable to unregister membership listener", tag.Error(err)) + } close(wm.membershipChangedCh) wm.lock.Lock() diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index 81191937a38e..4cf1ddda3a09 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -188,21 +188,34 @@ func (s *scheduler) run() error { s.logger.Warn("Time went backwards", "from", t1, "to", t2) t2 = t1 } - nextSleep := s.processTimeRange( + nextSleep, err := s.processTimeRange( t1, t2, // resolve this to the schedule's policy as late as possible enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false, ) + if err != nil { + return err + } s.State.LastProcessedTime = timestamp.TimePtr(t2) // handle signals after processing time range that just elapsed scheduleChanged := s.processSignals() if scheduleChanged { // need to calculate sleep again - nextSleep = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false) + nextSleep, err = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false) + if err != nil { + return err + } } // try starting workflows in the buffer - for s.processBuffer() { + for { + b, err := s.processBuffer() + if err != nil { + return err + } + if !b { + break + } } s.updateMemoAndSearchAttributes() // sleep returns on any of: @@ -314,11 +327,11 @@ func (s *scheduler) processTimeRange( t1, t2 time.Time, overlapPolicy enumspb.ScheduleOverlapPolicy, manual bool, -) time.Duration { +) (time.Duration, error) { s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlapPolicy", overlapPolicy, "manual", manual) if s.cspec == nil { - return invalidDuration + return invalidDuration, nil } catchupWindow := s.getCatchupWindow() @@ -327,14 +340,16 @@ func (s *scheduler) processTimeRange( // Run this logic in a SideEffect so that we can fix bugs there without breaking // existing schedule workflows. var next getNextTimeResult - workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} { + if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} { return s.cspec.getNextTime(t1) - }).Get(&next) + }).Get(&next); err != nil { + return 0, err + } t1 = next.Next if t1.IsZero() { - return invalidDuration + return invalidDuration, nil } else if t1.After(t2) { - return t1.Sub(t2) + return t1.Sub(t2), nil } if !manual && t2.Sub(t1) > catchupWindow { s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1) @@ -675,7 +690,7 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en } // processBuffer should return true if there might be more work to do right now. -func (s *scheduler) processBuffer() bool { +func (s *scheduler) processBuffer() (bool, error) { s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "needRefresh", s.State.NeedRefresh) // TODO: consider doing this always and removing needRefresh? we only end up here without @@ -692,7 +707,7 @@ func (s *scheduler) processBuffer() bool { req := s.Schedule.Action.GetStartWorkflow() if req == nil || len(s.State.BufferedStarts) == 0 { s.State.BufferedStarts = nil - return false + return false, nil } isRunning := len(s.Info.RunningWorkflows) > 0 @@ -729,11 +744,15 @@ func (s *scheduler) processBuffer() bool { // Terminate or cancel if required (terminate overrides cancel if both are present) if action.needTerminate { for _, ex := range s.Info.RunningWorkflows { - s.terminateWorkflow(ex) + if err := s.terminateWorkflow(ex); err != nil { + return false, err + } } } else if action.needCancel { for _, ex := range s.Info.RunningWorkflows { - s.cancelWorkflow(ex) + if err := s.cancelWorkflow(ex); err != nil { + return false, err + } } } @@ -749,7 +768,7 @@ func (s *scheduler) processBuffer() bool { } } - return tryAgain + return tryAgain, nil } func (s *scheduler) recordAction(result *schedpb.ScheduleActionResult) { @@ -788,6 +807,10 @@ func (s *scheduler) startWorkflow( } ctx := workflow.WithLocalActivityOptions(s.ctx, options) + requestID, err := s.newUUIDString() + if err != nil { + return nil, err + } req := &schedspb.StartWorkflowRequest{ Request: &workflowservice.StartWorkflowExecutionRequest{ WorkflowId: workflowID, @@ -798,7 +821,7 @@ func (s *scheduler) startWorkflow( WorkflowRunTimeout: newWorkflow.WorkflowRunTimeout, WorkflowTaskTimeout: newWorkflow.WorkflowTaskTimeout, Identity: s.identity(), - RequestId: s.newUUIDString(), + RequestId: requestID, WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, RetryPolicy: newWorkflow.RetryPolicy, Memo: newWorkflow.Memo, @@ -809,7 +832,7 @@ func (s *scheduler) startWorkflow( ContinuedFailure: s.State.ContinuedFailure, } var res schedspb.StartWorkflowResponse - err := workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res) + err = workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res) if err != nil { return nil, err } @@ -885,47 +908,60 @@ func (s *scheduler) startLongPollWatcher(ex *commonpb.WorkflowExecution) { s.watchingWorkflowId = ex.WorkflowId } -func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) { +func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) error { ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions) + requestID, err := s.newUUIDString() + if err != nil { + return err + } areq := &schedspb.CancelWorkflowRequest{ - RequestId: s.newUUIDString(), + RequestId: requestID, Identity: s.identity(), Execution: ex, Reason: "cancelled by schedule overlap policy", } - err := workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil) + err = workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil) if err != nil { s.logger.Error("cancel workflow failed", "workflow", ex.WorkflowId, "error", err) + return err } // Note: the local activity has completed (or failed) here but the workflow might take time // to close since a cancel is only a request. + return nil } -func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) { +func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) error { ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions) + requestID, err := s.newUUIDString() + if err != nil { + return err + } areq := &schedspb.TerminateWorkflowRequest{ - RequestId: s.newUUIDString(), + RequestId: requestID, Identity: s.identity(), Execution: ex, Reason: "terminated by schedule overlap policy", } - err := workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil) + err = workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil) if err != nil { s.logger.Error("terminate workflow failed", "workflow", ex.WorkflowId, "error", err) } + return err } -func (s *scheduler) newUUIDString() string { +func (s *scheduler) newUUIDString() (string, error) { if len(s.uuidBatch) == 0 { - workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} { + if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} { out := make([]string, 10) for i := range out { out[i] = uuid.NewString() } return out - }).Get(&s.uuidBatch) + }).Get(&s.uuidBatch); err != nil { + return "", err + } } next := s.uuidBatch[0] s.uuidBatch = s.uuidBatch[1:] - return next + return next, nil } diff --git a/service/worker/worker.go b/service/worker/worker.go index df811d6d373e..0ad6854c3537 100644 --- a/service/worker/worker.go +++ b/service/worker/worker.go @@ -98,7 +98,9 @@ func (wm *workerManager) Start() { } for _, w := range wm.workers { - w.Start() + if err := w.Start(); err != nil { + wm.logger.Fatal("Unable to start worker", tag.Error(err)) + } } wm.logger.Info("", tag.ComponentWorkerManager, tag.LifeCycleStarted)