Skip to content

Commit

Permalink
Adding support for SET metric type for StatsD metrics. Converts into …
Browse files Browse the repository at this point in the history
…a Gauge counting the number of individual events per aggregation period
  • Loading branch information
pabloem committed May 25, 2024
1 parent 4fad287 commit ff18673
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 2 deletions.
19 changes: 19 additions & 0 deletions receiver/statsdreceiver/internal/protocol/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,25 @@ func buildGaugeMetric(parsedMetric statsDMetric, timeNow time.Time) pmetric.Scop
return ilm
}

func buildGaugeMetricFromSet(desc statsDMetricDescription, setValues summaryMetric, timeNow time.Time) pmetric.ScopeMetrics {
ilm := pmetric.NewScopeMetrics()
nm := ilm.Metrics().AppendEmpty()
nm.SetName(desc.name)
dp := nm.SetEmptyGauge().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(timeNow))

uniques := make(map[float64]struct{})
for i := range setValues.points {
uniques[setValues.points[i]] = struct{}{}
}
for i := desc.attrs.Iter(); i.Next(); {
dp.Attributes().PutStr(string(i.Attribute().Key), i.Attribute().Value.AsString())
}
dp.SetIntValue(int64(len(uniques)))
dp.SetDoubleValue(float64(len(uniques)))
return ilm
}

func buildSummaryMetric(desc statsDMetricDescription, summary summaryMetric, startTime, timeNow time.Time, percentiles []float64, ilm pmetric.ScopeMetrics) {
nm := ilm.Metrics().AppendEmpty()
nm.SetName(desc.name)
Expand Down
27 changes: 25 additions & 2 deletions receiver/statsdreceiver/internal/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ const (
HistogramType MetricType = "h"
TimingType MetricType = "ms"
DistributionType MetricType = "d"
SetType MetricType = "s"

CounterTypeName TypeName = "counter"
GaugeTypeName TypeName = "gauge"
HistogramTypeName TypeName = "histogram"
TimingTypeName TypeName = "timing"
TimingAltTypeName TypeName = "timer"
DistributionTypeName TypeName = "distribution"
SetTypeName TypeName = "set"

GaugeObserver ObserverType = "gauge"
SummaryObserver ObserverType = "summary"
Expand Down Expand Up @@ -94,6 +96,7 @@ type instruments struct {
counters map[statsDMetricDescription]pmetric.ScopeMetrics
summaries map[statsDMetricDescription]summaryMetric
histograms map[statsDMetricDescription]histogramMetric
sets map[statsDMetricDescription]summaryMetric // Sets must use summaries to count unique values
timersAndDistributions []pmetric.ScopeMetrics
}

Expand All @@ -104,6 +107,7 @@ func newInstruments(addr net.Addr) *instruments {
counters: make(map[statsDMetricDescription]pmetric.ScopeMetrics),
summaries: make(map[statsDMetricDescription]summaryMetric),
histograms: make(map[statsDMetricDescription]histogramMetric),
sets: make(map[statsDMetricDescription]summaryMetric),
}
}

Expand Down Expand Up @@ -150,6 +154,8 @@ func (t MetricType) FullName() TypeName {
return HistogramTypeName
case DistributionType:
return DistributionTypeName
case SetType:
return SetTypeName
}
return TypeName(fmt.Sprintf("unknown(%s)", t))
}
Expand Down Expand Up @@ -242,6 +248,10 @@ func (p *StatsDParser) GetMetrics() []BatchMetrics {
)
}

for desc, setMetric := range instrument.sets {
p.copyMetricAndScope(rm, buildGaugeMetricFromSet(desc, setMetric, now))
}

batchMetrics = append(batchMetrics, batch)
}
p.resetState(now)
Expand All @@ -267,7 +277,7 @@ func (p *StatsDParser) observerCategoryFor(t MetricType) ObserverCategory {
return p.histogramEvents
case TimingType:
return p.timerEvents
case CounterType, GaugeType:
case CounterType, GaugeType, SetType:
}
return defaultObserverCategory
}
Expand Down Expand Up @@ -309,6 +319,19 @@ func (p *StatsDParser) Aggregate(line string, addr net.Addr) error {
point.SetIntValue(point.IntValue() + parsedMetric.counterValue())
}

case SetType:
if existing, ok := instrument.sets[parsedMetric.description]; !ok {
instrument.sets[parsedMetric.description] = summaryMetric{
points: []float64{parsedMetric.asFloat}, // TODO: Should this be a different type?
weights: []float64{1},
}
} else {
instrument.sets[parsedMetric.description] = summaryMetric{
points: append(existing.points, parsedMetric.asFloat),
weights: append(existing.weights, 1),
}
}

case TimingType, HistogramType, DistributionType:
category := p.observerCategoryFor(parsedMetric.description.metricType)
switch category.method {
Expand Down Expand Up @@ -380,7 +403,7 @@ func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags b

inType := MetricType(parts[1])
switch inType {
case CounterType, GaugeType, HistogramType, TimingType, DistributionType:
case CounterType, GaugeType, HistogramType, TimingType, DistributionType, SetType:
result.description.metricType = inType
default:
return result, fmt.Errorf("unsupported metric type: %s", inType)
Expand Down
56 changes: 56 additions & 0 deletions receiver/statsdreceiver/internal/protocol/statsd_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@ func Test_ParseMessageToMetric(t *testing.T) {
false,
"h", 0, nil, nil, 0),
},
{
name: "simple set",
input: "test.metric.set:42|s",
wantMetric: testStatsDMetric(
"test.metric.set",
42,
false,
"s", 0, nil, nil, 0),
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -1913,3 +1922,50 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) {
})
}
}

func TestStatsDParser_AggregateSets(t *testing.T) {
timeNowFunc = func() time.Time {
return time.Unix(711, 0)
}

tests := []struct {
name string
input []string
expectedSummaries map[statsDMetricDescription]summaryMetric
err error
}{
{
name: "set",
input: []string{
"statsdTestMetric1:300|s|#mykey:myvalue",
"statsdTestMetric1:100|s|#mykey:myvalue",
"statsdTestMetric1:300|s|#mykey:myvalue",
"statsdTestMetric1:200|s|#mykey:myvalue",
},
expectedSummaries: map[statsDMetricDescription]summaryMetric{
testDescription("statsdTestMetric1", "s",
[]string{"mykey"}, []string{"myvalue"}): {
points: []float64{300, 100, 300, 200},
weights: []float64{1, 1, 1, 1},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}}))
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
addrKey := newNetAddr(addr)
for _, line := range tt.input {
err = p.Aggregate(line, addr)
}
if tt.err != nil {
assert.Equal(t, tt.err, err)
} else {
assert.EqualValues(t, tt.expectedSummaries, p.instrumentsByAddress[addrKey].sets)
}
})
}
}

0 comments on commit ff18673

Please sign in to comment.