Skip to content

Commit

Permalink
Add task count metric (#2869)
Browse files Browse the repository at this point in the history
  • Loading branch information
nagl-temporal committed May 20, 2022
1 parent d36291f commit 5eda907
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 1 deletion.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2125,6 +2125,7 @@ const (
SignalInfoCount
RequestCancelInfoCount
BufferedEventsCount
TaskCount
WorkflowRetryBackoffTimerCount
WorkflowCronBackoffTimerCount
WorkflowCleanupDeleteCount
Expand Down Expand Up @@ -2607,6 +2608,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
SignalInfoCount: NewDimensionlessHistogramDef("signal_info_count"),
RequestCancelInfoCount: NewDimensionlessHistogramDef("request_cancel_info_count"),
BufferedEventsCount: NewDimensionlessHistogramDef("buffered_events_count"),
TaskCount: NewDimensionlessHistogramDef("task_count"),
WorkflowRetryBackoffTimerCount: NewCounterDef("workflow_retry_backoff_timer"),
WorkflowCronBackoffTimerCount: NewCounterDef("workflow_cron_backoff_timer"),
WorkflowCleanupDeleteCount: NewCounterDef("workflow_cleanup_delete"),
Expand Down
1 change: 1 addition & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ type (
SignalInfoCount int
SignalRequestIDCount int
BufferedEventsCount int
TaskCountByCategory map[string]int
}

HistoryStatistics struct {
Expand Down
19 changes: 18 additions & 1 deletion common/persistence/size.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

package persistence

import "go.temporal.io/server/service/history/tasks"

func statusOfInternalWorkflow(
state *InternalWorkflowMutableState,
historyStatistics *HistoryStatistics,
Expand Down Expand Up @@ -144,7 +146,8 @@ func statusOfInternalWorkflowMutation(
bufferedEventsSize = mutation.NewBufferedEvents.Size()
}

// TODO what about tasks?
taskCountByCategory := taskCountsByCategory(&mutation.Tasks)

// TODO what about checksum?

totalSize := executionInfoSize
Expand Down Expand Up @@ -184,7 +187,17 @@ func statusOfInternalWorkflowMutation(

BufferedEventsSize: bufferedEventsSize,
BufferedEventsCount: bufferedEventsCount,

TaskCountByCategory: taskCountByCategory,
}
}

func taskCountsByCategory(t *map[tasks.Category][]InternalHistoryTask) map[string]int {
counts := make(map[string]int)
for category, tasks := range *t {
counts[category.Name()] = len(tasks)
}
return counts
}

func statusOfInternalWorkflowSnapshot(
Expand Down Expand Up @@ -229,6 +242,8 @@ func statusOfInternalWorkflowSnapshot(
totalSize += signalRequestIDSize
totalSize += bufferedEventsSize

taskCountByCategory := taskCountsByCategory(&snapshot.Tasks)

return &MutableStateStatistics{
TotalSize: totalSize,
HistoryStatistics: historyStatistics,
Expand Down Expand Up @@ -256,5 +271,7 @@ func statusOfInternalWorkflowSnapshot(

BufferedEventsSize: bufferedEventsSize,
BufferedEventsCount: bufferedEventsCount,

TaskCountByCategory: taskCountByCategory,
}
}
4 changes: 4 additions & 0 deletions service/history/workflow/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func emitMutableStateStatus(
scope.RecordDistribution(metrics.HistorySize, stats.HistoryStatistics.SizeDiff)
scope.RecordDistribution(metrics.HistoryCount, stats.HistoryStatistics.CountDiff)
}

for category, taskCount := range stats.TaskCountByCategory {
scope.Tagged(metrics.TaskCategoryTag(category)).RecordDistribution(metrics.TaskCount, taskCount)
}
}

func emitWorkflowCompletionStats(
Expand Down

0 comments on commit 5eda907

Please sign in to comment.