From e2309d17ba25dbc7f69fd62ea0891fa619822743 Mon Sep 17 00:00:00 2001 From: jswxstw Date: Thu, 21 Mar 2024 15:51:55 +0800 Subject: [PATCH] fix: Enable realtime metric gc after its workflow is completed. Fixes #12790 Signed-off-by: oninowang --- workflow/controller/controller.go | 2 +- workflow/controller/operator.go | 1 + workflow/metrics/metrics.go | 23 ++++++++++++++++++++++- workflow/metrics/metrics_test.go | 20 ++++++++++++++++++-- workflow/metrics/server.go | 6 +++++- 5 files changed, 47 insertions(+), 5 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 0db3adee61ce..d2f3f58ecf8c 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1087,7 +1087,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) DeleteFunc: func(obj interface{}) { wf, ok := obj.(*unstructured.Unstructured) if ok { // maybe cache.DeletedFinalStateUnknown - wfc.metrics.StopRealtimeMetricsForKey(string(wf.GetUID())) + wfc.metrics.DeleteRealtimeMetricsForKey(string(wf.GetUID())) } }, }) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 9a830aed186b..264ac779aeb8 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -802,6 +802,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { // Make sure the workflow completed. if woc.wf.Status.Fulfilled() { + woc.controller.metrics.StopRealtimeMetricsForKey(string(woc.wf.GetUID())) if err := woc.deleteTaskResults(ctx); err != nil { woc.log.WithError(err).Warn("failed to delete task-results") } diff --git a/workflow/metrics/metrics.go b/workflow/metrics/metrics.go index a61516b99a32..ba73cca50515 100644 --- a/workflow/metrics/metrics.go +++ b/workflow/metrics/metrics.go @@ -42,6 +42,8 @@ func (s ServerConfig) SameServerAs(other ServerConfig) bool { type metric struct { metric prometheus.Metric lastUpdated time.Time + realtime bool + completed bool } type Metrics struct { @@ -153,6 +155,23 @@ func (m *Metrics) StopRealtimeMetricsForKey(key string) { return } + realtimeMetrics := m.workflows[key] + for _, metric := range realtimeMetrics { + if realtimeMetric, ok := m.customMetrics[metric]; ok { + realtimeMetric.completed = true + m.customMetrics[metric] = realtimeMetric + } + } +} + +func (m *Metrics) DeleteRealtimeMetricsForKey(key string) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if _, exists := m.workflows[key]; !exists { + return + } + realtimeMetrics := m.workflows[key] for _, metric := range realtimeMetrics { delete(m.customMetrics, metric) @@ -190,12 +209,14 @@ func (m *Metrics) UpsertCustomMetric(key string, ownerKey string, newMetric prom } else { m.metricNameHelps[name] = help } - m.customMetrics[key] = metric{metric: newMetric, lastUpdated: time.Now()} + customMetric := metric{metric: newMetric, lastUpdated: time.Now()} // If this is a realtime metric, track it if realtime { + customMetric.realtime = true m.workflows[ownerKey] = append(m.workflows[ownerKey], key) } + m.customMetrics[key] = customMetric return nil } diff --git a/workflow/metrics/metrics_test.go b/workflow/metrics/metrics_test.go index 857a7e67ec80..baf1da833cce 100644 --- a/workflow/metrics/metrics_test.go +++ b/workflow/metrics/metrics_test.go @@ -112,6 +112,10 @@ func TestMetricGC(t *testing.T) { if assert.NoError(t, err) { assert.Len(t, m.customMetrics, 1) } + err = m.UpsertCustomMetric("realtime_metric", "workflow-uid", newCounter("test", "test", nil), true) + if assert.NoError(t, err) { + assert.Len(t, m.customMetrics, 2) + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -127,7 +131,19 @@ func TestMetricGC(t *testing.T) { // Sleep to prevent overloading test worker CPU. time.Sleep(100 * time.Millisecond) } + assert.Len(t, m.customMetrics, 1) + m.StopRealtimeMetricsForKey("workflow-uid") + timeoutTime = time.Now().Add(time.Second * 2) + // Ensure we get at least one TTL run + for time.Now().Before(timeoutTime) { + // Break if we know our test will pass. + if len(m.customMetrics) == 0 { + break + } + // Sleep to prevent overloading test worker CPU. + time.Sleep(100 * time.Millisecond) + } assert.Len(t, m.customMetrics, 0) } @@ -171,7 +187,7 @@ func TestRealTimeMetricDeletion(t *testing.T) { assert.NotEmpty(t, m.workflows["123"]) assert.Len(t, m.customMetrics, 1) - m.StopRealtimeMetricsForKey("123") + m.DeleteRealtimeMetricsForKey("123") assert.Empty(t, m.workflows["123"]) assert.Len(t, m.customMetrics, 0) @@ -183,7 +199,7 @@ func TestRealTimeMetricDeletion(t *testing.T) { assert.Empty(t, m.workflows["456"]) assert.Len(t, m.customMetrics, 1) - m.StopRealtimeMetricsForKey("456") + m.DeleteRealtimeMetricsForKey("456") assert.Empty(t, m.workflows["456"]) assert.Len(t, m.customMetrics, 1) } diff --git a/workflow/metrics/server.go b/workflow/metrics/server.go index cde6ac4bf3b8..13642474682d 100644 --- a/workflow/metrics/server.go +++ b/workflow/metrics/server.go @@ -138,7 +138,11 @@ func (m *Metrics) garbageCollector(ctx context.Context) { return case <-ticker.C: for key, metric := range m.customMetrics { - if time.Since(metric.lastUpdated) > m.metricsConfig.TTL { + if metric.realtime { + if metric.completed && time.Since(metric.lastUpdated) > m.metricsConfig.TTL { + delete(m.customMetrics, key) + } + } else if time.Since(metric.lastUpdated) > m.metricsConfig.TTL { delete(m.customMetrics, key) } }