Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receiver/prometheus: add metricGroup.toDistributionPoint pdata conversion #3667

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 51 additions & 1 deletion receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,56 @@ func (mf *metricFamilyPdata) updateLabelKeys(ls labels.Labels) {
}
}

func (mf *metricFamilyPdata) getGroupKey(ls labels.Labels) string {
mf.updateLabelKeys(ls)
return dpgSignature(mf.labelKeysOrdered, ls)
}

func (mg *metricGroupPdata) toDistributionPoint(orderedLabelKeys []string, dest *pdata.HistogramDataPointSlice) bool {
if !mg.hasCount || len(mg.complexValue) == 0 {
return false
}

mg.sortPoints()

// for OCAgent Proto, the bounds won't include +inf
// TODO: (@odeke-em) should we also check OpenTelemetry Pdata for bucket bounds?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @bogdandrutu that code is from the straight up translation from the prior code. To avoid too many moving parts and to keep parity with the existing code, I am going to file an issue to remind me to investigate and finish that TODO that was added ages ago.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed https://github.com/open-telemetry/opentelemetry-collector/issues/3684 and I don't think this change is a blocker for this PR as this PR is a translation to directly use pdata. Thanks for the review and for the answer @bogdandrutu!

bounds := make([]float64, len(mg.complexValue)-1)
bucketCounts := make([]uint64, len(mg.complexValue))

for i := 0; i < len(mg.complexValue); i++ {
if i != len(mg.complexValue)-1 {
// not need to add +inf as bound to oc proto
bounds[i] = mg.complexValue[i].boundary
}
adjustedCount := mg.complexValue[i].value
if i != 0 {
adjustedCount -= mg.complexValue[i-1].value
}
bucketCounts[i] = uint64(adjustedCount)
}

point := dest.AppendEmpty()
point.SetExplicitBounds(bounds)
point.SetCount(uint64(mg.count))
point.SetSum(mg.sum)
point.SetBucketCounts(bucketCounts)
// The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
tsNanos := pdata.Timestamp(mg.ts * 1e6)
point.SetStartTimestamp(tsNanos)
point.SetTimestamp(tsNanos)
populateLabelValuesPdata(orderedLabelKeys, mg.ls, point.LabelsMap())

return true
}

func populateLabelValuesPdata(orderedKeys []string, ls labels.Labels, dest pdata.StringMap) {
src := ls.Map()
for _, key := range orderedKeys {
dest.Insert(key, src[key])
}
}

// Purposefully being referenced to avoid lint warnings about being "unused".
var _ = (*metricFamilyPdata)(nil).updateLabelKeys

Expand All @@ -112,8 +162,8 @@ func (mf *metricFamilyPdata) loadMetricGroupOrCreate(groupKey string, ls labels.
mg = &metricGroupPdata{
family: mf,
metricGroup: metricGroup{
ls: ls,
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
},
}
Expand Down
226 changes: 187 additions & 39 deletions receiver/prometheusreceiver/internal/otlp_metricfamily_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
package internal

import (
"fmt"
"testing"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/scrape"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/model/pdata"
)

type byLookupMetadataCache map[string]scrape.MetricMetadata
Expand All @@ -35,46 +39,46 @@ func (bmc byLookupMetadataCache) SharedLabels() labels.Labels {
return nil
}

func TestIsCumulativeEquivalence(t *testing.T) {
mc := byLookupMetadataCache{
"counter": scrape.MetricMetadata{
Metric: "cr",
Type: textparse.MetricTypeCounter,
Help: "This is some help",
Unit: "By",
},
"gauge": scrape.MetricMetadata{
Metric: "ge",
Type: textparse.MetricTypeGauge,
Help: "This is some help",
Unit: "1",
},
"gaugehistogram": scrape.MetricMetadata{
Metric: "gh",
Type: textparse.MetricTypeGaugeHistogram,
Help: "This is some help",
Unit: "?",
},
"histogram": scrape.MetricMetadata{
Metric: "hg",
Type: textparse.MetricTypeHistogram,
Help: "This is some help",
Unit: "ms",
},
"summary": scrape.MetricMetadata{
Metric: "s",
Type: textparse.MetricTypeSummary,
Help: "This is some help",
Unit: "?",
},
"unknown": scrape.MetricMetadata{
Metric: "u",
Type: textparse.MetricTypeUnknown,
Help: "This is some help",
Unit: "?",
},
}
var mc = byLookupMetadataCache{
"counter": scrape.MetricMetadata{
Metric: "cr",
Type: textparse.MetricTypeCounter,
Help: "This is some help",
Unit: "By",
},
"gauge": scrape.MetricMetadata{
Metric: "ge",
Type: textparse.MetricTypeGauge,
Help: "This is some help",
Unit: "1",
},
"gaugehistogram": scrape.MetricMetadata{
Metric: "gh",
Type: textparse.MetricTypeGaugeHistogram,
Help: "This is some help",
Unit: "?",
},
"histogram": scrape.MetricMetadata{
Metric: "hg",
Type: textparse.MetricTypeHistogram,
Help: "This is some help",
Unit: "ms",
},
"summary": scrape.MetricMetadata{
Metric: "s",
Type: textparse.MetricTypeSummary,
Help: "This is some help",
Unit: "?",
},
"unknown": scrape.MetricMetadata{
Metric: "u",
Type: textparse.MetricTypeUnknown,
Help: "This is some help",
Unit: "?",
},
}

func TestIsCumulativeEquivalence(t *testing.T) {
tests := []struct {
name string
want bool
Expand All @@ -98,3 +102,147 @@ func TestIsCumulativeEquivalence(t *testing.T) {
})
}
}

func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
type scrape struct {
at int64
value float64
metric string
}
tests := []struct {
name string
labels labels.Labels
scrapes []*scrape
want func() pdata.HistogramDataPoint
}{
{
name: "histogram",
labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "le", Value: "0.75"}, {Name: "b", Value: "B"}},
scrapes: []*scrape{
{at: 11, value: 10, metric: "histogram_count"},
{at: 11, value: 1004.78, metric: "histogram_sum"},
{at: 13, value: 33.7, metric: "value"},
},
want: func() pdata.HistogramDataPoint {
point := pdata.NewHistogramDataPoint()
point.SetCount(10)
point.SetSum(1004.78)
point.SetTimestamp(11 * 1e6) // the time in milliseconds -> nanoseconds.
point.SetBucketCounts([]uint64{33})
point.SetExplicitBounds([]float64{})
point.SetStartTimestamp(11 * 1e6)
labelsMap := point.LabelsMap()
labelsMap.Insert("a", "A")
labelsMap.Insert("b", "B")
return point
},
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
}

require.Equal(t, 1, len(mp.groups), "Expecting exactly 1 groupKey")
groupKey := mp.getGroupKey(tt.labels.Copy())
require.NotNil(t, mp.groups[groupKey], "Expecting the groupKey to have a value given key:: "+groupKey)

hdpL := pdata.NewHistogramDataPointSlice()
require.True(t, mp.groups[groupKey].toDistributionPoint(mp.labelKeysOrdered, &hdpL))
require.Equal(t, 1, hdpL.Len(), "Exactly one point expected")
got := hdpL.At(0)
want := tt.want()
require.Equal(t, want, got, "Expected the points to be equal")
})
}
}

func TestMetricGroupData_toDistributionPointEquivalence(t *testing.T) {
type scrape struct {
at int64
value float64
metric string
}
tests := []struct {
name string
labels labels.Labels
scrapes []*scrape
}{
{
name: "histogram",
labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "le", Value: "0.75"}, {Name: "b", Value: "B"}},
scrapes: []*scrape{
{at: 11, value: 10, metric: "histogram_count"},
{at: 11, value: 1004.78, metric: "histogram_sum"},
{at: 13, value: 33.7, metric: "value"},
},
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mf := newMetricFamily(tt.name, mc, zap.NewNop()).(*metricFamily)
mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
}
groupKey := mf.getGroupKey(tt.labels.Copy())
ocTimeseries := mf.groups[groupKey].toDistributionTimeSeries(mf.labelKeysOrdered)
hdpL := pdata.NewHistogramDataPointSlice()
require.True(t, mp.groups[groupKey].toDistributionPoint(mp.labelKeysOrdered, &hdpL))
require.Equal(t, len(ocTimeseries.Points), hdpL.Len(), "They should have the exact same number of points")
require.Equal(t, 1, hdpL.Len(), "Exactly one point expected")
ocPoint := ocTimeseries.Points[0]
pdataPoint := hdpL.At(0)
// 1. Ensure that the startTimestamps are equal.
require.Equal(t, ocTimeseries.GetStartTimestamp().AsTime(), pdataPoint.Timestamp().AsTime(), "The timestamp must be equal")
// 2. Ensure that the count is equal.
ocHistogram := ocPoint.GetDistributionValue()
require.Equal(t, ocHistogram.GetCount(), int64(pdataPoint.Count()), "Count must be equal")
// 3. Ensure that the sum is equal.
require.Equal(t, ocHistogram.GetSum(), pdataPoint.Sum(), "Sum must be equal")
// 4. Ensure that the point's timestamp is equal to that from the OpenCensusProto data point.
require.Equal(t, ocPoint.GetTimestamp().AsTime(), pdataPoint.Timestamp().AsTime(), "Point timestamps must be equal")
// 5. Ensure that bucket bounds are the same.
require.Equal(t, len(ocHistogram.GetBuckets()), len(pdataPoint.BucketCounts()), "Bucket counts must have the same length")
var ocBucketCounts []uint64
for i, bucket := range ocHistogram.GetBuckets() {
ocBucketCounts = append(ocBucketCounts, uint64(bucket.GetCount()))

// 6. Ensure that the exemplars match.
ocExemplar := bucket.Exemplar
if ocExemplar == nil {
if i >= pdataPoint.Exemplars().Len() { // Both have the exact same number of exemplars.
continue
}
// Otherwise an exemplar is present for the pdata data point but not for the OpenCensus Proto histogram.
t.Fatalf("Exemplar #%d is ONLY present in the pdata point but not in the OpenCensus Proto histogram", i)
}
pdataExemplar := pdataPoint.Exemplars().At(i)
msgPrefix := fmt.Sprintf("Exemplar #%d:: ", i)
require.Equal(t, ocExemplar.Timestamp.AsTime(), pdataExemplar.Timestamp().AsTime(), msgPrefix+"timestamp mismatch")
require.Equal(t, ocExemplar.Value, pdataExemplar.Value(), msgPrefix+"value mismatch")
pdataExemplarAttachments := make(map[string]string)
pdataExemplar.FilteredLabels().Range(func(key, value string) bool {
pdataExemplarAttachments[key] = value
return true
})
require.Equal(t, ocExemplar.Attachments, pdataExemplarAttachments, msgPrefix+"attachments mismatch")
}
// 7. Ensure that bucket bounds are the same.
require.Equal(t, ocBucketCounts, pdataPoint.BucketCounts(), "Bucket counts must be equal")
// 8. Ensure that the labels all match up.
ocStringMap := pdata.NewStringMap()
for i, labelValue := range ocTimeseries.LabelValues {
ocStringMap.Insert(mf.labelKeysOrdered[i], labelValue.Value)
}
require.Equal(t, ocStringMap.Sort(), pdataPoint.LabelsMap().Sort())
})
}
}