Skip to content

Commit

Permalink
[poller autoscaler] fix logic to identify empty tasks (#1192)
Browse files Browse the repository at this point in the history
  • Loading branch information
shijiesheng committed Sep 28, 2022
1 parent db5eb67 commit aa89bb7
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 10 deletions.
1 change: 1 addition & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ func getFeatureFlags(options *ClientOptions) FeatureFlags {
if options != nil {
return FeatureFlags{
WorkflowExecutionAlreadyCompletedErrorEnabled: options.FeatureFlags.WorkflowExecutionAlreadyCompletedErrorEnabled,
PollerAutoScalerEnabled: options.FeatureFlags.PollerAutoScalerEnabled,
}
}
return FeatureFlags{}
Expand Down
28 changes: 21 additions & 7 deletions internal/internal_poller_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,31 @@ func (m *pollerUsageEstimator) Reset() {

// CollectUsage counts past poll results to estimate autoscaler.Usages
func (m *pollerUsageEstimator) CollectUsage(data interface{}) error {
switch v := data.(type) {
case *polledTask:
if v == nil { // no-task poll
m.atomicBits.Add(1 << 32)
} else {
m.atomicBits.Add(1)
}
isEmpty, err := isTaskEmpty(data)
if err != nil {
return err
}
if isEmpty { // no-task poll
m.atomicBits.Add(1 << 32)
} else {
m.atomicBits.Add(1)
}
return nil
}

func isTaskEmpty(task interface{}) (bool, error) {
switch t := task.(type) {
case *workflowTask:
return t == nil || t.task == nil, nil
case *activityTask:
return t == nil || t.task == nil, nil
case *localActivityTask:
return t == nil || t.workflowTask == nil, nil
default:
return false, errors.New("unknown task type")
}
}

// Estimate is based on past poll counts
func (m *pollerUsageEstimator) Estimate() (autoscaler.Usages, error) {
bits := m.atomicBits.Load()
Expand Down
5 changes: 3 additions & 2 deletions internal/internal_poller_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package internal
import (
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
s "go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/internal/common/autoscaler"
"go.uber.org/zap/zaptest"
"math/rand"
Expand Down Expand Up @@ -270,10 +271,10 @@ type unrelatedPolledTask struct{}
func generateRandomPollResults(noTaskPoll, taskPoll, unrelated int) <-chan interface{} {
var result []interface{}
for i := 0; i < noTaskPoll; i++ {
result = append(result, (*polledTask)(nil))
result = append(result, &activityTask{})
}
for i := 0; i < taskPoll; i++ {
result = append(result, &polledTask{})
result = append(result, &activityTask{task: &s.PollForActivityTaskResponse{}})
}
for i := 0; i < unrelated; i++ {
result = append(result, &unrelatedPolledTask{})
Expand Down
4 changes: 3 additions & 1 deletion internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ func (bw *baseWorker) pollTask() {
} else {
if bw.pollerAutoScaler != nil {
if pErr := bw.pollerAutoScaler.CollectUsage(task); pErr != nil {
bw.logger.Warn("poller auto scaler collect usage error", zap.Error(pErr))
bw.logger.Sugar().Warnw("poller auto scaler collect usage error",
"error", pErr,
"task", task)
}
}
bw.retrier.Succeeded()
Expand Down

0 comments on commit aa89bb7

Please sign in to comment.