From 9732f77ebedb0bff2cb314f9c02127f7068dcc53 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Mon, 14 Mar 2022 11:29:15 -0700 Subject: [PATCH] Emit estimated matching task lag metric (#2605) --- common/metrics/defs.go | 2 ++ common/metrics/tags.go | 5 +++++ common/metrics/temporal_queues.go | 5 ++++- service/history/historyEngine.go | 8 ++++++-- service/history/workflowTaskHandlerCallbacks.go | 8 ++++++-- service/matching/matchingEngine_test.go | 2 +- service/matching/taskQueueManager.go | 2 +- service/matching/taskReader.go | 11 ++++++++++- 8 files changed, 35 insertions(+), 8 deletions(-) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 37327f95dfb..afad883aa6b 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2131,6 +2131,7 @@ const ( TaskQueueStoppedCounter TaskWriteThrottlePerTaskQueueCounter TaskWriteLatencyPerTaskQueue + TaskLagPerTaskQueueGauge NumMatchingMetrics ) @@ -2588,6 +2589,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ TaskQueueStoppedCounter: NewCounterDef("task_queue_stopped"), TaskWriteThrottlePerTaskQueueCounter: NewRollupCounterDef("task_write_throttle_count_per_tl", "task_write_throttle_count"), TaskWriteLatencyPerTaskQueue: NewRollupTimerDef("task_write_latency_per_tl", "task_write_latency"), + TaskLagPerTaskQueueGauge: NewGaugeDef("task_lag_per_tl"), }, Worker: { ReplicatorMessages: NewCounterDef("replicator_messages"), diff --git a/common/metrics/tags.go b/common/metrics/tags.go index ad2aa2b5183..1a421606b27 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -29,6 +29,7 @@ import ( "strconv" "strings" + "go.temporal.io/api/enums/v1" enumspb "go.temporal.io/api/enums/v1" ) @@ -135,6 +136,10 @@ func TaskQueueTag(value string) Tag { return &tagImpl{key: taskQueue, value: sanitizer.Value(value)} } +func TaskQueueTypeTag(tqType enums.TaskQueueType) Tag { + return &tagImpl{key: TaskTypeTagName, value: tqType.String()} +} + // WorkflowTypeTag returns a new workflow type tag. func WorkflowTypeTag(value string) Tag { if len(value) == 0 { diff --git a/common/metrics/temporal_queues.go b/common/metrics/temporal_queues.go index f74529c932d..4915edd40d5 100644 --- a/common/metrics/temporal_queues.go +++ b/common/metrics/temporal_queues.go @@ -42,5 +42,8 @@ func GetPerTaskQueueScope( metricTaskQueueName = unknownValue } - return baseScope.Tagged(NamespaceTag(namespaceName), TaskQueueTag(metricTaskQueueName)) + return baseScope.Tagged( + NamespaceTag(namespaceName), + TaskQueueTag(metricTaskQueueName), + ) } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 0fc114498a5..baa205991a5 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -1376,8 +1376,12 @@ func (e *historyEngineImpl) RecordActivityTaskStarted( namespaceName := namespaceEntry.Name() taskQueueName := ai.GetTaskQueue() - metrics.GetPerTaskQueueScope(metricsScope, namespaceName.String(), taskQueueName, enumspb.TASK_QUEUE_KIND_NORMAL). - Tagged(metrics.TaskTypeTag("activity")). + metrics.GetPerTaskQueueScope( + metricsScope, + namespaceName.String(), + taskQueueName, + enumspb.TASK_QUEUE_KIND_NORMAL, + ).Tagged(metrics.TaskQueueTypeTag(enumspb.TASK_QUEUE_TYPE_ACTIVITY)). RecordTimer(metrics.TaskScheduleToStartLatency, scheduleToStartLatency) response.StartedTime = ai.StartedTime diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index 5a42224f081..dc12f106c35 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -234,8 +234,12 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted( workflowScheduleToStartLatency := workflowTask.StartedTime.Sub(*workflowTask.ScheduledTime) namespaceName := namespaceEntry.Name() taskQueue := workflowTask.TaskQueue - metrics.GetPerTaskQueueScope(metricsScope, namespaceName.String(), taskQueue.GetName(), taskQueue.GetKind()). - Tagged(metrics.TaskTypeTag("workflow")). + metrics.GetPerTaskQueueScope( + metricsScope, + namespaceName.String(), + taskQueue.GetName(), + taskQueue.GetKind(), + ).Tagged(metrics.TaskQueueTypeTag(enumspb.TASK_QUEUE_TYPE_WORKFLOW)). RecordTimer(metrics.TaskScheduleToStartLatency, workflowScheduleToStartLatency) resp, err = handler.createRecordWorkflowTaskStartedResponse(mutableState, workflowTask, req.PollRequest.GetIdentity()) diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index 8e6f14895e0..780f64c9a10 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -844,7 +844,7 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() { } time.Sleep(20 * time.Millisecond) // So any buffer tasks from 0 rps get picked up - syncCtr := scope.Snapshot().Counters()["test.sync_throttle_count_per_tl+namespace="+matchingTestNamespace+",operation=TaskQueueMgr,service_name=matching,taskqueue=makeToast"] + syncCtr := scope.Snapshot().Counters()["test.sync_throttle_count_per_tl+namespace="+matchingTestNamespace+",operation=TaskQueueMgr,service_name=matching,task_type=Activity,taskqueue=makeToast"] s.Equal(1, int(syncCtr.Value())) // Check times zero rps is set = throttle counter s.EqualValues(1, s.taskManager.getCreateTaskCount(tlID)) // Check times zero rps is set = Tasks stored in persistence s.EqualValues(0, s.taskManager.getTaskCount(tlID)) diff --git a/service/matching/taskQueueManager.go b/service/matching/taskQueueManager.go index e7b59dc8634..a875669c59b 100644 --- a/service/matching/taskQueueManager.go +++ b/service/matching/taskQueueManager.go @@ -174,7 +174,7 @@ func newTaskQueueManager( nsName.String(), taskQueue.name, taskQueueKind, - ) + ).Tagged(metrics.TaskQueueTypeTag(taskQueue.taskType)) tlMgr := &taskQueueManagerImpl{ status: common.DaemonStatusInitialized, namespaceRegistry: e.namespaceRegistry, diff --git a/service/matching/taskReader.go b/service/matching/taskReader.go index 248c7d3ccfc..3c0b80a65cd 100644 --- a/service/matching/taskReader.go +++ b/service/matching/taskReader.go @@ -268,7 +268,9 @@ func (tr *taskReader) addSingleTaskToBuffer( } func (tr *taskReader) persistAckLevel() error { - return tr.tlMgr.db.UpdateState(tr.tlMgr.taskAckManager.getAckLevel()) + ackLevel := tr.tlMgr.taskAckManager.getAckLevel() + tr.emitTaskLagMetric(ackLevel) + return tr.tlMgr.db.UpdateState(ackLevel) } func (tr *taskReader) isTaskAddedRecently(lastAddTime time.Time) bool { @@ -282,3 +284,10 @@ func (tr *taskReader) logger() log.Logger { func (tr *taskReader) scope() metrics.Scope { return tr.tlMgr.metricScope } + +func (tr *taskReader) emitTaskLagMetric(ackLevel int64) { + // note: this metric is only an estimation for the lag. + // taskID in DB may not be continuous, especially when task list ownership changes. + maxReadLevel := tr.tlMgr.taskWriter.GetMaxReadLevel() + tr.scope().UpdateGauge(metrics.TaskLagPerTaskQueueGauge, float64(maxReadLevel-ackLevel)) +}