Skip to content

Commit

Permalink
matching: metrics related fixes (#1908)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 committed May 29, 2019
1 parent dc5eee6 commit 58092d6
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 41 deletions.
20 changes: 19 additions & 1 deletion common/metrics/tags.go
Expand Up @@ -30,7 +30,8 @@ const (
instance = "instance"
domain = "domain"

domainAllValue = "all"
domainAllValue = "all"
domainUnknownValue = "_unknown_"
)

// Tag is an interface to define metrics tags
Expand Down Expand Up @@ -75,6 +76,23 @@ func (d domainAllTag) Value() string {
return domainAllValue
}

type domainUnknownTag struct{}

// DomainAllTag returns a new domain:unknown tag-value
func DomainUnknownTag() Tag {
return domainUnknownTag{}
}

// Key returns the key of the domain unknown tag
func (d domainUnknownTag) Key() string {
return domain
}

// Value returns the value of the domain unknown tag
func (d domainUnknownTag) Value() string {
return domainUnknownValue
}

type instanceTag struct {
value string
}
Expand Down
92 changes: 54 additions & 38 deletions service/matching/taskListManager.go
Expand Up @@ -103,16 +103,14 @@ type (

// Single task list in memory state
taskListManagerImpl struct {
sync.RWMutex

domainCache cache.DomainCache
taskListID *taskListID
logger log.Logger
metricsClient metrics.Client
domainName string
domainScope metrics.Scope // domain tagged metric scope
engine *matchingEngineImpl
config *taskListConfig
domainCache cache.DomainCache
taskListID *taskListID
logger log.Logger
metricsClient metrics.Client
domainNameValue atomic.Value
domainScopeValue atomic.Value // domain tagged metric scope
engine *matchingEngineImpl
config *taskListConfig

// pollerHistory stores poller which poll from this tasklist in last few minutes
pollerHistory *pollerHistory
Expand Down Expand Up @@ -247,10 +245,10 @@ func newTaskListManagerWithRateLimiter(
taskListKind = common.TaskListKindPtr(s.TaskListKindNormal)
}

domainName, domainScope := domainNameAndMetricScope(e.domainCache, taskList.domainID, e.metricsClient, metrics.MatchingTaskListMgrScope)
db := newTaskListDB(e.taskManager, taskList.domainID, taskList.taskListName, taskList.taskType, int(*taskListKind), e.logger)
tlMgr := &taskListManagerImpl{
domainCache: domainCache,
metricsClient: e.metricsClient,
engine: e,
taskBuffer: make(chan *persistence.TaskInfo, taskBufferSize),
notifyCh: make(chan struct{}, 1),
Expand All @@ -261,8 +259,6 @@ func newTaskListManagerWithRateLimiter(
taskListID: taskList,
logger: e.logger.WithTags(tag.WorkflowTaskListName(taskList.taskListName),
tag.WorkflowTaskListType(taskList.taskType)),
domainScope: domainScope,
domainName: domainName,
db: db,
taskAckManager: newAckManager(e.logger),
taskGC: newTaskGC(db, config),
Expand All @@ -274,6 +270,9 @@ func newTaskListManagerWithRateLimiter(
rateLimiter: rl,
taskListKind: int(*taskListKind),
}
tlMgr.domainNameValue.Store("")
tlMgr.domainScopeValue.Store(e.metricsClient.Scope(metrics.MatchingTaskListMgrScope, metrics.DomainUnknownTag()))
tlMgr.tryInitDomainNameAndScope()
tlMgr.taskWriter = newTaskWriter(tlMgr)
tlMgr.startWG.Add(1)
return tlMgr
Expand Down Expand Up @@ -379,31 +378,18 @@ func (c *taskListManagerImpl) GetTaskContext(
RunId: common.StringPtr(task.RunID),
}

c.tryInitDomainNameAndScope()
tCtx := &taskContext{
info: task,
workflowExecution: workflowExecution,
tlMgr: c,
syncResponseCh: result.C, // nil if task is loaded from persistence
queryTaskInfo: result.queryTask, // non-nil for query task
backlogCountHint: c.taskAckManager.getBacklogCountHint(),
domainName: c.domainName,
domainName: c.domainName(),
}
return tCtx, nil
}

// reload from domainCache in case it got empty result during construction
func (c *taskListManagerImpl) tryInitDomainNameAndScope() {
if len(c.domainName) == 0 {
domainName, scope := domainNameAndMetricScope(c.domainCache, c.taskListID.domainID, c.metricsClient, metrics.MatchingTaskListMgrScope)
if len(domainName) > 0 {
c.Lock()
c.domainName, c.domainScope = domainName, scope
c.Unlock()
}
}
}

func (c *taskListManagerImpl) persistAckLevel() error {
return c.db.UpdateState(c.taskAckManager.getAckLevel())
}
Expand Down Expand Up @@ -482,18 +468,18 @@ func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond
select {
case result := <-tasksForPoll:
if result.syncMatch {
c.domainScope.IncCounter(metrics.PollSuccessWithSyncCounter)
c.domainScope().IncCounter(metrics.PollSuccessWithSyncCounter)
}
c.domainScope.IncCounter(metrics.PollSuccessCounter)
c.domainScope().IncCounter(metrics.PollSuccessCounter)
return result, nil
case result := <-c.queryTasksForPoll:
if result.syncMatch {
c.domainScope.IncCounter(metrics.PollSuccessWithSyncCounter)
c.domainScope().IncCounter(metrics.PollSuccessWithSyncCounter)
}
c.domainScope.IncCounter(metrics.PollSuccessCounter)
c.domainScope().IncCounter(metrics.PollSuccessCounter)
return result, nil
case <-childCtx.Done():
c.domainScope.IncCounter(metrics.PollTimeoutCounter)
c.domainScope().IncCounter(metrics.PollTimeoutCounter)
return nil, ErrNoTasks
}
}
Expand All @@ -514,10 +500,10 @@ func (c *taskListManagerImpl) renewLeaseWithRetry() (taskListState, error) {
newState, err = c.db.RenewLease()
return
}
c.domainScope.IncCounter(metrics.LeaseRequestCounter)
c.domainScope().IncCounter(metrics.LeaseRequestCounter)
err := backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
if err != nil {
c.domainScope.IncCounter(metrics.LeaseFailureCounter)
c.domainScope().IncCounter(metrics.LeaseFailureCounter)
c.engine.unloadTaskList(c.taskListID)
return newState, err
}
Expand Down Expand Up @@ -608,7 +594,7 @@ func (c *taskListManagerImpl) trySyncMatch(task *persistence.TaskInfo) (*persist
if rsv.OK() { // if we were indeed given a reservation, return it before we bail out
rsv.Cancel()
}
c.domainScope.IncCounter(metrics.SyncThrottleCounter)
c.domainScope().IncCounter(metrics.SyncThrottleCounter)
return nil, errAddTasklistThrottled
}
time.Sleep(rsv.Delay())
Expand Down Expand Up @@ -641,7 +627,7 @@ func (c *taskListManagerImpl) executeWithRetry(
})

if _, ok := err.(*persistence.ConditionFailedError); ok {
c.domainScope.IncCounter(metrics.ConditionFailedErrorCounter)
c.domainScope().IncCounter(metrics.ConditionFailedErrorCounter)
c.logger.Debug(fmt.Sprintf("Stopping task list due to persistence condition failure. Err: %v", err))
c.Stop()
}
Expand Down Expand Up @@ -741,11 +727,41 @@ func (c *taskListManagerImpl) isTaskAddedRecently(lastAddTime time.Time) bool {
return time.Now().Sub(lastAddTime) <= c.config.MaxTasklistIdleTime()
}

// if domainCache return error, it will return "" as domainNamne and a scope without domainName tagged
func (c *taskListManagerImpl) domainScope() metrics.Scope {
scope := c.domainScopeValue.Load().(metrics.Scope)
if scope != nil {
return scope
}
c.tryInitDomainNameAndScope()
return c.domainScopeValue.Load().(metrics.Scope)
}

func (c *taskListManagerImpl) domainName() string {
name := c.domainNameValue.Load().(string)
if len(name) > 0 {
return name
}
c.tryInitDomainNameAndScope()
return c.domainNameValue.Load().(string)
}

// reload from domainCache in case it got empty result during construction
func (c *taskListManagerImpl) tryInitDomainNameAndScope() {
domainName := c.domainNameValue.Load().(string)
if len(domainName) == 0 {
domainName, scope := domainNameAndMetricScope(c.domainCache, c.taskListID.domainID, c.metricsClient, metrics.MatchingTaskListMgrScope)
if len(domainName) > 0 && scope != nil {
c.domainNameValue.Store(domainName)
c.domainScopeValue.Store(scope)
}
}
}

// if domainCache return error, it will return "" as domainName and a scope without domainName tagged
func domainNameAndMetricScope(cache cache.DomainCache, domainID string, client metrics.Client, scope int) (string, metrics.Scope) {
entry, err := cache.GetDomainByID(domainID)
if err != nil {
return "", client.Scope(scope)
return "", nil
}
return entry.GetInfo().Name, client.Scope(scope, metrics.DomainTag(entry.GetInfo().Name))
}
4 changes: 2 additions & 2 deletions service/matching/taskReader.go
Expand Up @@ -46,7 +46,7 @@ deliverBufferTasksLoop:
"Unable to add buffer task, rate limit failed, domainId: %s, tasklist: %s, error: %s",
c.taskListID.domainID, c.taskListID.taskListName, err.Error()),
)
c.domainScope.IncCounter(metrics.BufferThrottleCounter)
c.domainScope().IncCounter(metrics.BufferThrottleCounter)
// This is to prevent busy looping when throttling is set to 0
runtime.Gosched()
continue
Expand Down Expand Up @@ -194,7 +194,7 @@ func (c *taskListManagerImpl) addTasksToBuffer(
now := time.Now()
for _, t := range tasks {
if c.isTaskExpired(t, now) {
c.domainScope.IncCounter(metrics.ExpiredTasksCounter)
c.domainScope().IncCounter(metrics.ExpiredTasksCounter)
continue
}
if !c.addSingleTaskToBuffer(t, lastWriteTime, idleTimer) {
Expand Down

0 comments on commit 58092d6

Please sign in to comment.