Skip to content

Commit

Permalink
[connector/spanmetrics] Produce delta temporality span metrics with t…
Browse files Browse the repository at this point in the history
…imestamps representing an uninterrupted series (#31780)

Closes #31671

**Description:** 
Currently delta temporality span metrics are produced with
(StartTimestamp, Timestamp)'s of `(T1, T2), (T3, T4) ...`. However, the
[specification](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#temporality)
says that the correct pattern for an uninterrupted delta series is `(T1,
T2), (T2, T3) ...`

This misalignment with the spec can confuse downstream components'
conversion from delta temporality to cumulative temporality, causing
each data point to be viewed as a cumulative counter "reset". An example
of this is in `prometheusexporter`

The conversion issue forces you to generate cumulative span metrics,
which use significantly more memory to cache the cumulative counts.

At work, I applied this patch to our collectors and switched to
producing delta temporality metrics for `prometheusexporter` to then
convert to cumulative. That caused a significant drop in-memory usage:


![image](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/17691679/804d0792-1085-400e-a4e3-d64fb865cd4f)

**Testing:** 
- Unit tests asserting the timestamps
- Manual testing with `prometheusexporter` to make sure counter values
are cumulative and no longer being reset after receiving each delta data
point
  • Loading branch information
swar8080 authored Jun 4, 2024
1 parent d9ebb84 commit 9395f36
Show file tree
Hide file tree
Showing 12 changed files with 371 additions and 39 deletions.
27 changes: 27 additions & 0 deletions .chloggen/spanmetrics_uninterrupted_delta_timestamps.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetrics

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Produce delta temporality span metrics with StartTimeUnixNano and TimeUnixNano values representing an uninterrupted series

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31671, 30688]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This allows producing delta span metrics instead of the more memory-intensive cumulative metrics, specifically when a downstream component can convert the delta metrics to cumulative.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
3 changes: 2 additions & 1 deletion connector/spanmetricsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ The following settings can be optionally configured:
- `namespace`: Defines the namespace of the generated metrics. If `namespace` provided, generated metric name will be added `namespace.` prefix.
- `metrics_flush_interval` (default: `60s`): Defines the flush interval of the generated metrics.
- `metrics_expiration` (default: `0`): Defines the expiration time as `time.Duration`, after which, if no new spans are received, metrics will no longer be exported. Setting to `0` means the metrics will never expire (default behavior).
- `metric_timestamp_cache_size` (default `1000`): Only relevant for delta temporality span metrics. Controls the size of the cache used to keep track of a metric's TimestampUnixNano the last time it was flushed. When a metric is evicted from the cache, its next data point will indicate a "reset" in the series. Downstream components converting from delta to cumulative, like `prometheusexporter`, may handle these resets by setting cumulative counters back to 0.
- `exemplars`: Use to configure how to attach exemplars to metrics.
- `enabled` (default: `false`): enabling will add spans as Exemplars to all metrics. Exemplars are only kept for one flush interval.
- `enabled` (default: `false`): enabling will add spans as Exemplars to all metrics. Exemplars are only kept for one flush interval.rom the cache, its next data point will indicate a "reset" in the series. Downstream components converting from delta to cumulative, like `prometheusexporter`, may handle these resets by setting cumulative counters back to 0.
- `events`: Use to configure the events metric.
- `enabled`: (default: `false`): enabling will add the events metric.
- `dimensions`: (mandatory if `enabled`) the list of the span's event attributes to add as dimensions to the events metric, which will be included _on top of_ the common and configured `dimensions` for span and resource attributes.
Expand Down
19 changes: 19 additions & 0 deletions connector/spanmetricsconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var defaultHistogramBucketsMs = []float64{
2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10_000, 15_000,
}

var defaultDeltaTimestampCacheSize = 1000

// Dimension defines the dimension name and optional default value if the Dimension is missing from a span attribute.
type Dimension struct {
Name string `mapstructure:"name"`
Expand Down Expand Up @@ -71,6 +73,9 @@ type Config struct {
// Default value (0) means that the metrics will never expire.
MetricsExpiration time.Duration `mapstructure:"metrics_expiration"`

// TimestampCacheSize controls the size of the cache used to keep track of delta metrics' TimestampUnixNano the last time it was flushed
TimestampCacheSize *int `mapstructure:"metric_timestamp_cache_size"`

// Namespace is the namespace of the metrics emitted by the connector.
Namespace string `mapstructure:"namespace"`

Expand Down Expand Up @@ -139,6 +144,13 @@ func (c Config) Validate() error {
return fmt.Errorf("invalid metrics_expiration: %v, the duration should be positive", c.MetricsExpiration)
}

if c.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta && c.GetDeltaTimestampCacheSize() <= 0 {
return fmt.Errorf(
"invalid delta timestamp cache size: %v, the maximum number of the items in the cache should be positive",
c.GetDeltaTimestampCacheSize(),
)
}

return nil
}

Expand All @@ -151,6 +163,13 @@ func (c Config) GetAggregationTemporality() pmetric.AggregationTemporality {
return pmetric.AggregationTemporalityCumulative
}

func (c Config) GetDeltaTimestampCacheSize() int {
if c.TimestampCacheSize != nil {
return *c.TimestampCacheSize
}
return defaultDeltaTimestampCacheSize
}

// validateDimensions checks duplicates for reserved dimensions and additional dimensions.
func validateDimensions(dimensions []Dimension) error {
labelNames := make(map[string]struct{})
Expand Down
39 changes: 36 additions & 3 deletions connector/spanmetricsconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ func TestLoadConfig(t *testing.T) {

defaultMethod := "GET"
defaultMaxPerDatapoint := 5
customTimestampCacheSize := 123
tests := []struct {
id component.ID
expected component.Config
errorMessage string
id component.ID
expected component.Config
errorMessage string
extraAssertions func(config *Config)
}{
{
id: component.NewIDWithName(metadata.Type, "default"),
Expand Down Expand Up @@ -125,6 +127,34 @@ func TestLoadConfig(t *testing.T) {
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
},
},
{
id: component.NewIDWithName(metadata.Type, "custom_delta_timestamp_cache_size"),
expected: &Config{
AggregationTemporality: "AGGREGATION_TEMPORALITY_DELTA",
TimestampCacheSize: &customTimestampCacheSize,
DimensionsCacheSize: defaultDimensionsCacheSize,
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
MetricsFlushInterval: 60 * time.Second,
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
},
},
{
id: component.NewIDWithName(metadata.Type, "default_delta_timestamp_cache_size"),
expected: &Config{
AggregationTemporality: "AGGREGATION_TEMPORALITY_DELTA",
DimensionsCacheSize: defaultDimensionsCacheSize,
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
MetricsFlushInterval: 60 * time.Second,
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
},
extraAssertions: func(config *Config) {
assert.Equal(t, defaultDeltaTimestampCacheSize, config.GetDeltaTimestampCacheSize())
},
},
{
id: component.NewIDWithName(metadata.Type, "invalid_delta_timestamp_cache_size"),
errorMessage: "invalid delta timestamp cache size: 0, the maximum number of the items in the cache should be positive",
},
}

for _, tt := range tests {
Expand All @@ -143,6 +173,9 @@ func TestLoadConfig(t *testing.T) {
}
assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg)
if tt.extraAssertions != nil {
tt.extraAssertions(cfg.(*Config))
}
})
}
}
Expand Down
52 changes: 46 additions & 6 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/lightstep/go-expohisto/structure"
"github.com/tilinna/clock"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -72,6 +73,9 @@ type connectorImp struct {
eDimensions []dimension

events EventsConfig

// Tracks the last TimestampUnixNano for delta metrics so that they represent an uninterrupted series. Unused for cumulative span metrics.
lastDeltaTimestamps *simplelru.LRU[metrics.Key, pcommon.Timestamp]
}

type resourceMetrics struct {
Expand Down Expand Up @@ -125,6 +129,16 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
resourceMetricsKeyAttributes[attr] = s
}

var lastDeltaTimestamps *simplelru.LRU[metrics.Key, pcommon.Timestamp]
if cfg.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
lastDeltaTimestamps, err = simplelru.NewLRU[metrics.Key, pcommon.Timestamp](cfg.GetDeltaTimestampCacheSize(), func(k metrics.Key, _ pcommon.Timestamp) {
logger.Info("Evicting cached delta timestamp", zap.String("key", string(k)))
})
if err != nil {
return nil, err
}
}

return &connectorImp{
logger: logger,
config: *cfg,
Expand All @@ -133,6 +147,7 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
dimensions: newDimensions(cfg.Dimensions),
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
metricKeyToDimensions: metricKeyToDimensionsCache,
lastDeltaTimestamps: lastDeltaTimestamps,
ticker: ticker,
done: make(chan struct{}),
eDimensions: newDimensions(cfg.Events.Dimensions),
Expand Down Expand Up @@ -251,6 +266,7 @@ func (p *connectorImp) exportMetrics(ctx context.Context) {
// buildMetrics collects the computed raw metrics data and builds OTLP metrics.
func (p *connectorImp) buildMetrics() pmetric.Metrics {
m := pmetric.NewMetrics()
timestamp := pcommon.NewTimestampFromTime(time.Now())

p.resourceMetrics.ForEach(func(_ resourceKey, rawMetrics *resourceMetrics) {
rm := m.ResourceMetrics().AppendEmpty()
Expand All @@ -259,23 +275,46 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
sm := rm.ScopeMetrics().AppendEmpty()
sm.Scope().SetName("spanmetricsconnector")

/**
* To represent an uninterrupted stream of metrics as per the spec, the (StartTimestamp, Timestamp)'s of successive data points should be:
* - For cumulative metrics: (T1, T2), (T1, T3), (T1, T4) ...
* - For delta metrics: (T1, T2), (T2, T3), (T3, T4) ...
*/
deltaMetricKeys := make(map[metrics.Key]bool)
startTimeGenerator := func(mk metrics.Key) pcommon.Timestamp {
startTime := rawMetrics.startTimestamp
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
if lastTimestamp, ok := p.lastDeltaTimestamps.Get(mk); ok {
startTime = lastTimestamp
}
// Collect lastDeltaTimestamps keys that need to be updated. Metrics can share the same key, so defer the update.
deltaMetricKeys[mk] = true
}
return startTime
}

sums := rawMetrics.sums
metric := sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameCalls))
sums.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
sums.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
if !p.config.Histogram.Disable {
histograms := rawMetrics.histograms
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameDuration))
metric.SetUnit(p.config.Histogram.Unit.String())
histograms.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
histograms.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
}

events := rawMetrics.events
if p.events.Enabled {
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameEvents))
events.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
events.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
}

for mk := range deltaMetricKeys {
// For delta metrics, cache the current data point's timestamp, which will be the start timestamp for the next data points in the series
p.lastDeltaTimestamps.Add(mk, timestamp)
}
})

Expand Down Expand Up @@ -326,6 +365,7 @@ func (p *connectorImp) resetState() {
// and span metadata such as name, kind, status_code and any additional
// dimensions the user has configured.
func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
startTimestamp := pcommon.NewTimestampFromTime(time.Now())
for i := 0; i < traces.ResourceSpans().Len(); i++ {
rspans := traces.ResourceSpans().At(i)
resourceAttr := rspans.Resource().Attributes()
Expand All @@ -334,7 +374,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
continue
}

rm := p.getOrCreateResourceMetrics(resourceAttr)
rm := p.getOrCreateResourceMetrics(resourceAttr, startTimestamp)
sums := rm.sums
histograms := rm.histograms
events := rm.events
Expand Down Expand Up @@ -431,7 +471,7 @@ func (p *connectorImp) createResourceKey(attr pcommon.Map) resourceKey {
return pdatautil.MapHash(m)
}

func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics {
func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimestamp pcommon.Timestamp) *resourceMetrics {
key := p.createResourceKey(attr)
v, ok := p.resourceMetrics.Get(key)
if !ok {
Expand All @@ -440,7 +480,7 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMet
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
attributes: attr,
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
startTimestamp: startTimestamp,
}
p.resourceMetrics.Add(key, v)
}
Expand Down
Loading

0 comments on commit 9395f36

Please sign in to comment.