From ff186733fffc7d99770b79584d6d546a4f8f9dab Mon Sep 17 00:00:00 2001 From: Pablo Estrada Date: Fri, 24 May 2024 22:29:02 -0400 Subject: [PATCH] Adding support for SET metric type for StatsD metrics. Converts into a Gauge counting the number of individual events per aggregation period --- .../internal/protocol/metric_translator.go | 19 +++++++ .../internal/protocol/statsd_parser.go | 27 ++++++++- .../internal/protocol/statsd_parser_test.go | 56 +++++++++++++++++++ 3 files changed, 100 insertions(+), 2 deletions(-) diff --git a/receiver/statsdreceiver/internal/protocol/metric_translator.go b/receiver/statsdreceiver/internal/protocol/metric_translator.go index cd79d10e34dbc..4a4216ced8f5e 100644 --- a/receiver/statsdreceiver/internal/protocol/metric_translator.go +++ b/receiver/statsdreceiver/internal/protocol/metric_translator.go @@ -67,6 +67,25 @@ func buildGaugeMetric(parsedMetric statsDMetric, timeNow time.Time) pmetric.Scop return ilm } +func buildGaugeMetricFromSet(desc statsDMetricDescription, setValues summaryMetric, timeNow time.Time) pmetric.ScopeMetrics { + ilm := pmetric.NewScopeMetrics() + nm := ilm.Metrics().AppendEmpty() + nm.SetName(desc.name) + dp := nm.SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetTimestamp(pcommon.NewTimestampFromTime(timeNow)) + + uniques := make(map[float64]struct{}) + for i := range setValues.points { + uniques[setValues.points[i]] = struct{}{} + } + for i := desc.attrs.Iter(); i.Next(); { + dp.Attributes().PutStr(string(i.Attribute().Key), i.Attribute().Value.AsString()) + } + dp.SetIntValue(int64(len(uniques))) + dp.SetDoubleValue(float64(len(uniques))) + return ilm +} + func buildSummaryMetric(desc statsDMetricDescription, summary summaryMetric, startTime, timeNow time.Time, percentiles []float64, ilm pmetric.ScopeMetrics) { nm := ilm.Metrics().AppendEmpty() nm.SetName(desc.name) diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser.go b/receiver/statsdreceiver/internal/protocol/statsd_parser.go index ccbcdd6d108a1..fc18d9ed903d8 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser.go @@ -39,6 +39,7 @@ const ( HistogramType MetricType = "h" TimingType MetricType = "ms" DistributionType MetricType = "d" + SetType MetricType = "s" CounterTypeName TypeName = "counter" GaugeTypeName TypeName = "gauge" @@ -46,6 +47,7 @@ const ( TimingTypeName TypeName = "timing" TimingAltTypeName TypeName = "timer" DistributionTypeName TypeName = "distribution" + SetTypeName TypeName = "set" GaugeObserver ObserverType = "gauge" SummaryObserver ObserverType = "summary" @@ -94,6 +96,7 @@ type instruments struct { counters map[statsDMetricDescription]pmetric.ScopeMetrics summaries map[statsDMetricDescription]summaryMetric histograms map[statsDMetricDescription]histogramMetric + sets map[statsDMetricDescription]summaryMetric // Sets must use summaries to count unique values timersAndDistributions []pmetric.ScopeMetrics } @@ -104,6 +107,7 @@ func newInstruments(addr net.Addr) *instruments { counters: make(map[statsDMetricDescription]pmetric.ScopeMetrics), summaries: make(map[statsDMetricDescription]summaryMetric), histograms: make(map[statsDMetricDescription]histogramMetric), + sets: make(map[statsDMetricDescription]summaryMetric), } } @@ -150,6 +154,8 @@ func (t MetricType) FullName() TypeName { return HistogramTypeName case DistributionType: return DistributionTypeName + case SetType: + return SetTypeName } return TypeName(fmt.Sprintf("unknown(%s)", t)) } @@ -242,6 +248,10 @@ func (p *StatsDParser) GetMetrics() []BatchMetrics { ) } + for desc, setMetric := range instrument.sets { + p.copyMetricAndScope(rm, buildGaugeMetricFromSet(desc, setMetric, now)) + } + batchMetrics = append(batchMetrics, batch) } p.resetState(now) @@ -267,7 +277,7 @@ func (p *StatsDParser) observerCategoryFor(t MetricType) ObserverCategory { return p.histogramEvents case TimingType: return p.timerEvents - case CounterType, GaugeType: + case CounterType, GaugeType, SetType: } return defaultObserverCategory } @@ -309,6 +319,19 @@ func (p *StatsDParser) Aggregate(line string, addr net.Addr) error { point.SetIntValue(point.IntValue() + parsedMetric.counterValue()) } + case SetType: + if existing, ok := instrument.sets[parsedMetric.description]; !ok { + instrument.sets[parsedMetric.description] = summaryMetric{ + points: []float64{parsedMetric.asFloat}, // TODO: Should this be a different type? + weights: []float64{1}, + } + } else { + instrument.sets[parsedMetric.description] = summaryMetric{ + points: append(existing.points, parsedMetric.asFloat), + weights: append(existing.weights, 1), + } + } + case TimingType, HistogramType, DistributionType: category := p.observerCategoryFor(parsedMetric.description.metricType) switch category.method { @@ -380,7 +403,7 @@ func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags b inType := MetricType(parts[1]) switch inType { - case CounterType, GaugeType, HistogramType, TimingType, DistributionType: + case CounterType, GaugeType, HistogramType, TimingType, DistributionType, SetType: result.description.metricType = inType default: return result, fmt.Errorf("unsupported metric type: %s", inType) diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go index ff32cbc2069e4..1bc6a2087f0b4 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go @@ -289,6 +289,15 @@ func Test_ParseMessageToMetric(t *testing.T) { false, "h", 0, nil, nil, 0), }, + { + name: "simple set", + input: "test.metric.set:42|s", + wantMetric: testStatsDMetric( + "test.metric.set", + 42, + false, + "s", 0, nil, nil, 0), + }, } for _, tt := range tests { @@ -1913,3 +1922,50 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) { }) } } + +func TestStatsDParser_AggregateSets(t *testing.T) { + timeNowFunc = func() time.Time { + return time.Unix(711, 0) + } + + tests := []struct { + name string + input []string + expectedSummaries map[statsDMetricDescription]summaryMetric + err error + }{ + { + name: "set", + input: []string{ + "statsdTestMetric1:300|s|#mykey:myvalue", + "statsdTestMetric1:100|s|#mykey:myvalue", + "statsdTestMetric1:300|s|#mykey:myvalue", + "statsdTestMetric1:200|s|#mykey:myvalue", + }, + expectedSummaries: map[statsDMetricDescription]summaryMetric{ + testDescription("statsdTestMetric1", "s", + []string{"mykey"}, []string{"myvalue"}): { + points: []float64{300, 100, 300, 200}, + weights: []float64{1, 1, 1, 1}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var err error + p := &StatsDParser{} + assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}})) + addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") + addrKey := newNetAddr(addr) + for _, line := range tt.input { + err = p.Aggregate(line, addr) + } + if tt.err != nil { + assert.Equal(t, tt.err, err) + } else { + assert.EqualValues(t, tt.expectedSummaries, p.instrumentsByAddress[addrKey].sets) + } + }) + } +}