Skip to content

Commit

Permalink
fix: Enable realtime metric gc after its workflow is completed. Fixes a…
Browse files Browse the repository at this point in the history
…rgoproj#12790

Signed-off-by: oninowang <oninowang@tencent.com>
  • Loading branch information
jswxstw authored and oninowang committed Mar 21, 2024
1 parent 17087ec commit e2309d1
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 5 deletions.
2 changes: 1 addition & 1 deletion workflow/controller/controller.go
Expand Up @@ -1087,7 +1087,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
DeleteFunc: func(obj interface{}) {
wf, ok := obj.(*unstructured.Unstructured)
if ok { // maybe cache.DeletedFinalStateUnknown
wfc.metrics.StopRealtimeMetricsForKey(string(wf.GetUID()))
wfc.metrics.DeleteRealtimeMetricsForKey(string(wf.GetUID()))
}
},
})
Expand Down
1 change: 1 addition & 0 deletions workflow/controller/operator.go
Expand Up @@ -802,6 +802,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {

// Make sure the workflow completed.
if woc.wf.Status.Fulfilled() {
woc.controller.metrics.StopRealtimeMetricsForKey(string(woc.wf.GetUID()))
if err := woc.deleteTaskResults(ctx); err != nil {
woc.log.WithError(err).Warn("failed to delete task-results")
}
Expand Down
23 changes: 22 additions & 1 deletion workflow/metrics/metrics.go
Expand Up @@ -42,6 +42,8 @@ func (s ServerConfig) SameServerAs(other ServerConfig) bool {
type metric struct {
metric prometheus.Metric
lastUpdated time.Time
realtime bool
completed bool
}

type Metrics struct {
Expand Down Expand Up @@ -153,6 +155,23 @@ func (m *Metrics) StopRealtimeMetricsForKey(key string) {
return
}

realtimeMetrics := m.workflows[key]
for _, metric := range realtimeMetrics {
if realtimeMetric, ok := m.customMetrics[metric]; ok {
realtimeMetric.completed = true
m.customMetrics[metric] = realtimeMetric
}
}
}

func (m *Metrics) DeleteRealtimeMetricsForKey(key string) {
m.mutex.Lock()
defer m.mutex.Unlock()

if _, exists := m.workflows[key]; !exists {
return
}

realtimeMetrics := m.workflows[key]
for _, metric := range realtimeMetrics {
delete(m.customMetrics, metric)
Expand Down Expand Up @@ -190,12 +209,14 @@ func (m *Metrics) UpsertCustomMetric(key string, ownerKey string, newMetric prom
} else {
m.metricNameHelps[name] = help
}
m.customMetrics[key] = metric{metric: newMetric, lastUpdated: time.Now()}

customMetric := metric{metric: newMetric, lastUpdated: time.Now()}
// If this is a realtime metric, track it
if realtime {
customMetric.realtime = true
m.workflows[ownerKey] = append(m.workflows[ownerKey], key)
}
m.customMetrics[key] = customMetric

return nil
}
Expand Down
20 changes: 18 additions & 2 deletions workflow/metrics/metrics_test.go
Expand Up @@ -112,6 +112,10 @@ func TestMetricGC(t *testing.T) {
if assert.NoError(t, err) {
assert.Len(t, m.customMetrics, 1)
}
err = m.UpsertCustomMetric("realtime_metric", "workflow-uid", newCounter("test", "test", nil), true)
if assert.NoError(t, err) {
assert.Len(t, m.customMetrics, 2)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -127,7 +131,19 @@ func TestMetricGC(t *testing.T) {
// Sleep to prevent overloading test worker CPU.
time.Sleep(100 * time.Millisecond)
}
assert.Len(t, m.customMetrics, 1)

m.StopRealtimeMetricsForKey("workflow-uid")
timeoutTime = time.Now().Add(time.Second * 2)
// Ensure we get at least one TTL run
for time.Now().Before(timeoutTime) {
// Break if we know our test will pass.
if len(m.customMetrics) == 0 {
break
}
// Sleep to prevent overloading test worker CPU.
time.Sleep(100 * time.Millisecond)
}
assert.Len(t, m.customMetrics, 0)
}

Expand Down Expand Up @@ -171,7 +187,7 @@ func TestRealTimeMetricDeletion(t *testing.T) {
assert.NotEmpty(t, m.workflows["123"])
assert.Len(t, m.customMetrics, 1)

m.StopRealtimeMetricsForKey("123")
m.DeleteRealtimeMetricsForKey("123")
assert.Empty(t, m.workflows["123"])
assert.Len(t, m.customMetrics, 0)

Expand All @@ -183,7 +199,7 @@ func TestRealTimeMetricDeletion(t *testing.T) {
assert.Empty(t, m.workflows["456"])
assert.Len(t, m.customMetrics, 1)

m.StopRealtimeMetricsForKey("456")
m.DeleteRealtimeMetricsForKey("456")
assert.Empty(t, m.workflows["456"])
assert.Len(t, m.customMetrics, 1)
}
6 changes: 5 additions & 1 deletion workflow/metrics/server.go
Expand Up @@ -138,7 +138,11 @@ func (m *Metrics) garbageCollector(ctx context.Context) {
return
case <-ticker.C:
for key, metric := range m.customMetrics {
if time.Since(metric.lastUpdated) > m.metricsConfig.TTL {
if metric.realtime {
if metric.completed && time.Since(metric.lastUpdated) > m.metricsConfig.TTL {
delete(m.customMetrics, key)
}
} else if time.Since(metric.lastUpdated) > m.metricsConfig.TTL {
delete(m.customMetrics, key)
}
}
Expand Down

0 comments on commit e2309d1

Please sign in to comment.