Skip to content

Commit

Permalink
Emit estimated matching task lag metric (#2605)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Mar 14, 2022
1 parent 3a3057e commit 9732f77
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 8 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Expand Up @@ -2131,6 +2131,7 @@ const (
TaskQueueStoppedCounter
TaskWriteThrottlePerTaskQueueCounter
TaskWriteLatencyPerTaskQueue
TaskLagPerTaskQueueGauge

NumMatchingMetrics
)
Expand Down Expand Up @@ -2588,6 +2589,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskQueueStoppedCounter: NewCounterDef("task_queue_stopped"),
TaskWriteThrottlePerTaskQueueCounter: NewRollupCounterDef("task_write_throttle_count_per_tl", "task_write_throttle_count"),
TaskWriteLatencyPerTaskQueue: NewRollupTimerDef("task_write_latency_per_tl", "task_write_latency"),
TaskLagPerTaskQueueGauge: NewGaugeDef("task_lag_per_tl"),
},
Worker: {
ReplicatorMessages: NewCounterDef("replicator_messages"),
Expand Down
5 changes: 5 additions & 0 deletions common/metrics/tags.go
Expand Up @@ -29,6 +29,7 @@ import (
"strconv"
"strings"

"go.temporal.io/api/enums/v1"
enumspb "go.temporal.io/api/enums/v1"
)

Expand Down Expand Up @@ -135,6 +136,10 @@ func TaskQueueTag(value string) Tag {
return &tagImpl{key: taskQueue, value: sanitizer.Value(value)}
}

func TaskQueueTypeTag(tqType enums.TaskQueueType) Tag {
return &tagImpl{key: TaskTypeTagName, value: tqType.String()}
}

// WorkflowTypeTag returns a new workflow type tag.
func WorkflowTypeTag(value string) Tag {
if len(value) == 0 {
Expand Down
5 changes: 4 additions & 1 deletion common/metrics/temporal_queues.go
Expand Up @@ -42,5 +42,8 @@ func GetPerTaskQueueScope(
metricTaskQueueName = unknownValue
}

return baseScope.Tagged(NamespaceTag(namespaceName), TaskQueueTag(metricTaskQueueName))
return baseScope.Tagged(
NamespaceTag(namespaceName),
TaskQueueTag(metricTaskQueueName),
)
}
8 changes: 6 additions & 2 deletions service/history/historyEngine.go
Expand Up @@ -1376,8 +1376,12 @@ func (e *historyEngineImpl) RecordActivityTaskStarted(
namespaceName := namespaceEntry.Name()
taskQueueName := ai.GetTaskQueue()

metrics.GetPerTaskQueueScope(metricsScope, namespaceName.String(), taskQueueName, enumspb.TASK_QUEUE_KIND_NORMAL).
Tagged(metrics.TaskTypeTag("activity")).
metrics.GetPerTaskQueueScope(
metricsScope,
namespaceName.String(),
taskQueueName,
enumspb.TASK_QUEUE_KIND_NORMAL,
).Tagged(metrics.TaskQueueTypeTag(enumspb.TASK_QUEUE_TYPE_ACTIVITY)).
RecordTimer(metrics.TaskScheduleToStartLatency, scheduleToStartLatency)

response.StartedTime = ai.StartedTime
Expand Down
8 changes: 6 additions & 2 deletions service/history/workflowTaskHandlerCallbacks.go
Expand Up @@ -234,8 +234,12 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted(
workflowScheduleToStartLatency := workflowTask.StartedTime.Sub(*workflowTask.ScheduledTime)
namespaceName := namespaceEntry.Name()
taskQueue := workflowTask.TaskQueue
metrics.GetPerTaskQueueScope(metricsScope, namespaceName.String(), taskQueue.GetName(), taskQueue.GetKind()).
Tagged(metrics.TaskTypeTag("workflow")).
metrics.GetPerTaskQueueScope(
metricsScope,
namespaceName.String(),
taskQueue.GetName(),
taskQueue.GetKind(),
).Tagged(metrics.TaskQueueTypeTag(enumspb.TASK_QUEUE_TYPE_WORKFLOW)).
RecordTimer(metrics.TaskScheduleToStartLatency, workflowScheduleToStartLatency)

resp, err = handler.createRecordWorkflowTaskStartedResponse(mutableState, workflowTask, req.PollRequest.GetIdentity())
Expand Down
2 changes: 1 addition & 1 deletion service/matching/matchingEngine_test.go
Expand Up @@ -844,7 +844,7 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
}

time.Sleep(20 * time.Millisecond) // So any buffer tasks from 0 rps get picked up
syncCtr := scope.Snapshot().Counters()["test.sync_throttle_count_per_tl+namespace="+matchingTestNamespace+",operation=TaskQueueMgr,service_name=matching,taskqueue=makeToast"]
syncCtr := scope.Snapshot().Counters()["test.sync_throttle_count_per_tl+namespace="+matchingTestNamespace+",operation=TaskQueueMgr,service_name=matching,task_type=Activity,taskqueue=makeToast"]
s.Equal(1, int(syncCtr.Value())) // Check times zero rps is set = throttle counter
s.EqualValues(1, s.taskManager.getCreateTaskCount(tlID)) // Check times zero rps is set = Tasks stored in persistence
s.EqualValues(0, s.taskManager.getTaskCount(tlID))
Expand Down
2 changes: 1 addition & 1 deletion service/matching/taskQueueManager.go
Expand Up @@ -174,7 +174,7 @@ func newTaskQueueManager(
nsName.String(),
taskQueue.name,
taskQueueKind,
)
).Tagged(metrics.TaskQueueTypeTag(taskQueue.taskType))
tlMgr := &taskQueueManagerImpl{
status: common.DaemonStatusInitialized,
namespaceRegistry: e.namespaceRegistry,
Expand Down
11 changes: 10 additions & 1 deletion service/matching/taskReader.go
Expand Up @@ -268,7 +268,9 @@ func (tr *taskReader) addSingleTaskToBuffer(
}

func (tr *taskReader) persistAckLevel() error {
return tr.tlMgr.db.UpdateState(tr.tlMgr.taskAckManager.getAckLevel())
ackLevel := tr.tlMgr.taskAckManager.getAckLevel()
tr.emitTaskLagMetric(ackLevel)
return tr.tlMgr.db.UpdateState(ackLevel)
}

func (tr *taskReader) isTaskAddedRecently(lastAddTime time.Time) bool {
Expand All @@ -282,3 +284,10 @@ func (tr *taskReader) logger() log.Logger {
func (tr *taskReader) scope() metrics.Scope {
return tr.tlMgr.metricScope
}

func (tr *taskReader) emitTaskLagMetric(ackLevel int64) {
// note: this metric is only an estimation for the lag.
// taskID in DB may not be continuous, especially when task list ownership changes.
maxReadLevel := tr.tlMgr.taskWriter.GetMaxReadLevel()
tr.scope().UpdateGauge(metrics.TaskLagPerTaskQueueGauge, float64(maxReadLevel-ackLevel))
}

0 comments on commit 9732f77

Please sign in to comment.