Skip to content

Commit

Permalink
[connector/servicegraphconnector] Gracefully handle half-cleaned metr…
Browse files Browse the repository at this point in the history
…ics (#31700)

The `cleanCache` function cleans up series that haven't been used in
15mins. It does this by:
1. Deleting it from p.keyToMetric (which holds the dimensions for the
metric), then
2. Deleting it from all the metric series maps, e.g. p.reqTotal

In parallel, the metrics are collected by:
1. Looping through all items in the metric series maps, e.g. p.reqTotal
2. For each, gets the metric's dimensions from p.keyToMetric

Because these occur in opposite orders, we can get into a sticky
situation where the collector function errors out when getting the
metric's dimensions.

This PR reverses the order in the cleanup function, hopefully avoiding
this situation.

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>
  • Loading branch information
samwright committed Mar 26, 2024
1 parent 2713d4a commit 3c9a357
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 6 deletions.
27 changes: 27 additions & 0 deletions .chloggen/handle-half-cleaned-up-servicegraph-metric.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: servicegraphprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix 'failed to find dimensions for key' error from race condition in metrics cleanup.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [ 31701 ]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
12 changes: 6 additions & 6 deletions connector/servicegraphconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,12 +666,6 @@ func (p *serviceGraphConnector) cleanCache() {
}
p.metricMutex.RUnlock()

p.metricMutex.Lock()
for _, key := range staleSeries {
delete(p.keyToMetric, key)
}
p.metricMutex.Unlock()

p.seriesMutex.Lock()
for _, key := range staleSeries {
delete(p.reqTotal, key)
Expand All @@ -684,6 +678,12 @@ func (p *serviceGraphConnector) cleanCache() {
delete(p.reqServerDurationSecondsBucketCounts, key)
}
p.seriesMutex.Unlock()

p.metricMutex.Lock()
for _, key := range staleSeries {
delete(p.keyToMetric, key)
}
p.metricMutex.Unlock()
}

// spanDuration returns the duration of the given span in seconds (legacy ms).
Expand Down
67 changes: 67 additions & 0 deletions connector/servicegraphconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,73 @@ func TestStaleSeriesCleanup(t *testing.T) {
assert.NoError(t, p.Shutdown(context.Background()))
}

func TestMapsAreConsistentDuringCleanup(t *testing.T) {
// Prepare
cfg := &Config{
MetricsExporter: "mock",
Dimensions: []string{"some-attribute", "non-existing-attribute"},
Store: StoreConfig{
MaxItems: 10,
TTL: time.Second,
},
}

mockMetricsExporter := newMockMetricsExporter()

set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)
p := newConnector(set, cfg)

mHost := newMockHost(map[component.DataType]map[component.ID]component.Component{
component.DataTypeMetrics: {
component.MustNewID("mock"): mockMetricsExporter,
},
})

assert.NoError(t, p.Start(context.Background(), mHost))

// ConsumeTraces
td := buildSampleTrace(t, "first")
assert.NoError(t, p.ConsumeTraces(context.Background(), td))

// Make series stale and force a cache cleanup
for key, metric := range p.keyToMetric {
metric.lastUpdated = 0
p.keyToMetric[key] = metric
}

// Start cleanup, but use locks to pretend that we are:
// - currently collecting metrics (so seriesMutex is locked)
// - currently getting dimensions for that series (so metricMutex is locked)
p.seriesMutex.Lock()
p.metricMutex.RLock()
go p.cleanCache()

// Since everything is locked, nothing has happened, so both should still have length 1
assert.Equal(t, 1, len(p.reqTotal))
assert.Equal(t, 1, len(p.keyToMetric))

// Now we pretend that we have stopped collecting metrics, by unlocking seriesMutex
p.seriesMutex.Unlock()

// Make sure cleanupCache has continued to the next mutex
time.Sleep(time.Millisecond)
p.seriesMutex.Lock()

// The expired series should have been removed. The metrics collector now won't look
// for dimensions from that series. It's important that it happens this way around,
// instead of deleting it from `keyToMetric`, otherwise the metrics collector will try
// and fail to find dimensions for a series that is about to be removed.
assert.Equal(t, 0, len(p.reqTotal))
assert.Equal(t, 1, len(p.keyToMetric))

p.metricMutex.RUnlock()
p.seriesMutex.Unlock()

// Shutdown the connector
assert.NoError(t, p.Shutdown(context.Background()))
}

func setupTelemetry(reader *sdkmetric.ManualReader) component.TelemetrySettings {
settings := componenttest.NewNopTelemetrySettings()
settings.MetricsLevel = configtelemetry.LevelNormal
Expand Down

0 comments on commit 3c9a357

Please sign in to comment.