Skip to content

Commit

Permalink
Fix errcheck in service/worker/
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 20, 2022
1 parent edd3f0a commit 64517d8
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 29 deletions.
10 changes: 8 additions & 2 deletions service/worker/pernamespaceworker.go
Expand Up @@ -151,7 +151,10 @@ func (wm *perNamespaceWorkerManager) Start(
// this will call namespaceCallback with current namespaces
wm.namespaceRegistry.RegisterStateChangeCallback(wm, wm.namespaceCallback)

wm.serviceResolver.AddListener(perNamespaceWorkerManagerListenerKey, wm.membershipChangedCh)
err := wm.serviceResolver.AddListener(perNamespaceWorkerManagerListenerKey, wm.membershipChangedCh)
if err != nil {
wm.logger.Fatal("Unable to register membership listener", tag.Error(err))
}
go wm.membershipChangedListener()

wm.logger.Info("", tag.LifeCycleStarted)
Expand All @@ -169,7 +172,10 @@ func (wm *perNamespaceWorkerManager) Stop() {
wm.logger.Info("", tag.LifeCycleStopping)

wm.namespaceRegistry.UnregisterStateChangeCallback(wm)
wm.serviceResolver.RemoveListener(perNamespaceWorkerManagerListenerKey)
err := wm.serviceResolver.RemoveListener(perNamespaceWorkerManagerListenerKey)
if err != nil {
wm.logger.Fatal("Unable to unregister membership listener", tag.Error(err))
}
close(wm.membershipChangedCh)

wm.lock.Lock()
Expand Down
88 changes: 62 additions & 26 deletions service/worker/scheduler/workflow.go
Expand Up @@ -188,21 +188,34 @@ func (s *scheduler) run() error {
s.logger.Warn("Time went backwards", "from", t1, "to", t2)
t2 = t1
}
nextSleep := s.processTimeRange(
nextSleep, err := s.processTimeRange(
t1, t2,
// resolve this to the schedule's policy as late as possible
enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED,
false,
)
if err != nil {
return err
}
s.State.LastProcessedTime = timestamp.TimePtr(t2)
// handle signals after processing time range that just elapsed
scheduleChanged := s.processSignals()
if scheduleChanged {
// need to calculate sleep again
nextSleep = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false)
nextSleep, err = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false)
if err != nil {
return err
}
}
// try starting workflows in the buffer
for s.processBuffer() {
for {
b, err := s.processBuffer()
if err != nil {
return err
}
if !b {
break
}
}
s.updateMemoAndSearchAttributes()
// sleep returns on any of:
Expand Down Expand Up @@ -314,11 +327,11 @@ func (s *scheduler) processTimeRange(
t1, t2 time.Time,
overlapPolicy enumspb.ScheduleOverlapPolicy,
manual bool,
) time.Duration {
) (time.Duration, error) {
s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlapPolicy", overlapPolicy, "manual", manual)

if s.cspec == nil {
return invalidDuration
return invalidDuration, nil
}

catchupWindow := s.getCatchupWindow()
Expand All @@ -327,14 +340,16 @@ func (s *scheduler) processTimeRange(
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
var next getNextTimeResult
workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
return s.cspec.getNextTime(t1)
}).Get(&next)
}).Get(&next); err != nil {
return 0, err
}
t1 = next.Next
if t1.IsZero() {
return invalidDuration
return invalidDuration, nil
} else if t1.After(t2) {
return t1.Sub(t2)
return t1.Sub(t2), nil
}
if !manual && t2.Sub(t1) > catchupWindow {
s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1)
Expand Down Expand Up @@ -675,7 +690,7 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en
}

// processBuffer should return true if there might be more work to do right now.
func (s *scheduler) processBuffer() bool {
func (s *scheduler) processBuffer() (bool, error) {
s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "needRefresh", s.State.NeedRefresh)

// TODO: consider doing this always and removing needRefresh? we only end up here without
Expand All @@ -692,7 +707,7 @@ func (s *scheduler) processBuffer() bool {
req := s.Schedule.Action.GetStartWorkflow()
if req == nil || len(s.State.BufferedStarts) == 0 {
s.State.BufferedStarts = nil
return false
return false, nil
}

isRunning := len(s.Info.RunningWorkflows) > 0
Expand Down Expand Up @@ -729,11 +744,15 @@ func (s *scheduler) processBuffer() bool {
// Terminate or cancel if required (terminate overrides cancel if both are present)
if action.needTerminate {
for _, ex := range s.Info.RunningWorkflows {
s.terminateWorkflow(ex)
if err := s.terminateWorkflow(ex); err != nil {
return false, err
}
}
} else if action.needCancel {
for _, ex := range s.Info.RunningWorkflows {
s.cancelWorkflow(ex)
if err := s.cancelWorkflow(ex); err != nil {
return false, err
}
}
}

Expand All @@ -749,7 +768,7 @@ func (s *scheduler) processBuffer() bool {
}
}

return tryAgain
return tryAgain, nil
}

func (s *scheduler) recordAction(result *schedpb.ScheduleActionResult) {
Expand Down Expand Up @@ -788,6 +807,10 @@ func (s *scheduler) startWorkflow(
}
ctx := workflow.WithLocalActivityOptions(s.ctx, options)

requestID, err := s.newUUIDString()
if err != nil {
return nil, err
}
req := &schedspb.StartWorkflowRequest{
Request: &workflowservice.StartWorkflowExecutionRequest{
WorkflowId: workflowID,
Expand All @@ -798,7 +821,7 @@ func (s *scheduler) startWorkflow(
WorkflowRunTimeout: newWorkflow.WorkflowRunTimeout,
WorkflowTaskTimeout: newWorkflow.WorkflowTaskTimeout,
Identity: s.identity(),
RequestId: s.newUUIDString(),
RequestId: requestID,
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
RetryPolicy: newWorkflow.RetryPolicy,
Memo: newWorkflow.Memo,
Expand All @@ -809,7 +832,7 @@ func (s *scheduler) startWorkflow(
ContinuedFailure: s.State.ContinuedFailure,
}
var res schedspb.StartWorkflowResponse
err := workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res)
err = workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -885,47 +908,60 @@ func (s *scheduler) startLongPollWatcher(ex *commonpb.WorkflowExecution) {
s.watchingWorkflowId = ex.WorkflowId
}

func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) {
func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) error {
ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions)
requestID, err := s.newUUIDString()
if err != nil {
return err
}
areq := &schedspb.CancelWorkflowRequest{
RequestId: s.newUUIDString(),
RequestId: requestID,
Identity: s.identity(),
Execution: ex,
Reason: "cancelled by schedule overlap policy",
}
err := workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
err = workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("cancel workflow failed", "workflow", ex.WorkflowId, "error", err)
return err
}
// Note: the local activity has completed (or failed) here but the workflow might take time
// to close since a cancel is only a request.
return nil
}

func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) {
func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) error {
ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions)
requestID, err := s.newUUIDString()
if err != nil {
return err
}
areq := &schedspb.TerminateWorkflowRequest{
RequestId: s.newUUIDString(),
RequestId: requestID,
Identity: s.identity(),
Execution: ex,
Reason: "terminated by schedule overlap policy",
}
err := workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
err = workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("terminate workflow failed", "workflow", ex.WorkflowId, "error", err)
}
return err
}

func (s *scheduler) newUUIDString() string {
func (s *scheduler) newUUIDString() (string, error) {
if len(s.uuidBatch) == 0 {
workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
out := make([]string, 10)
for i := range out {
out[i] = uuid.NewString()
}
return out
}).Get(&s.uuidBatch)
}).Get(&s.uuidBatch); err != nil {
return "", err
}
}
next := s.uuidBatch[0]
s.uuidBatch = s.uuidBatch[1:]
return next
return next, nil
}
4 changes: 3 additions & 1 deletion service/worker/worker.go
Expand Up @@ -98,7 +98,9 @@ func (wm *workerManager) Start() {
}

for _, w := range wm.workers {
w.Start()
if err := w.Start(); err != nil {
wm.logger.Fatal("Unable to start worker", tag.Error(err))
}
}

wm.logger.Info("", tag.ComponentWorkerManager, tag.LifeCycleStarted)
Expand Down

0 comments on commit 64517d8

Please sign in to comment.