Skip to content

Commit

Permalink
receiver/prometheus: add metricGroup.toDistributionPoint pdata conver…
Browse files Browse the repository at this point in the history
…sion

Implements metricGroupPdata toDistributionPoint and added unit tests
as well as equivalence tests to ensure the migration will render
the same results.

Updates #3137
Updates PR #3427
  • Loading branch information
odeke-em committed Jul 21, 2021
1 parent f46871a commit b34aa37
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 40 deletions.
52 changes: 51 additions & 1 deletion receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,56 @@ func (mf *metricFamilyPdata) updateLabelKeys(ls labels.Labels) {
}
}

func (mf *metricFamilyPdata) getGroupKey(ls labels.Labels) string {
mf.updateLabelKeys(ls)
return dpgSignature(mf.labelKeysOrdered, ls)
}

func (mg *metricGroupPdata) toDistributionPoint(orderedLabelKeys []string, dest *pdata.HistogramDataPointSlice) bool {
if !mg.hasCount || len(mg.complexValue) == 0 {
return false
}

mg.sortPoints()

// for OCAgent Proto, the bounds won't include +inf
// TODO: (@odeke-em) should we also check OpenTelemetry Pdata for bucket bounds?
bounds := make([]float64, len(mg.complexValue)-1)
bucketCounts := make([]uint64, len(mg.complexValue))

for i := 0; i < len(mg.complexValue); i++ {
if i != len(mg.complexValue)-1 {
// not need to add +inf as bound to oc proto
bounds[i] = mg.complexValue[i].boundary
}
adjustedCount := mg.complexValue[i].value
if i != 0 {
adjustedCount -= mg.complexValue[i-1].value
}
bucketCounts[i] = uint64(adjustedCount)
}

point := dest.AppendEmpty()
point.SetExplicitBounds(bounds)
point.SetCount(uint64(mg.count))
point.SetSum(mg.sum)
point.SetBucketCounts(bucketCounts)
// The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
tsNanos := pdata.Timestamp(mg.ts * 1e6)
point.SetStartTimestamp(tsNanos)
point.SetTimestamp(tsNanos)
populateLabelValuesPdata(orderedLabelKeys, mg.ls, point.LabelsMap())

return true
}

func populateLabelValuesPdata(orderedKeys []string, ls labels.Labels, dest pdata.StringMap) {
src := ls.Map()
for _, key := range orderedKeys {
dest.Insert(key, src[key])
}
}

// Purposefully being referenced to avoid lint warnings about being "unused".
var _ = (*metricFamilyPdata)(nil).updateLabelKeys

Expand All @@ -112,8 +162,8 @@ func (mf *metricFamilyPdata) loadMetricGroupOrCreate(groupKey string, ls labels.
mg = &metricGroupPdata{
family: mf,
metricGroup: metricGroup{
ls: ls,
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
},
}
Expand Down
226 changes: 187 additions & 39 deletions receiver/prometheusreceiver/internal/otlp_metricfamily_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
package internal

import (
"fmt"
"testing"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/scrape"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/model/pdata"
)

type byLookupMetadataCache map[string]scrape.MetricMetadata
Expand All @@ -35,46 +39,46 @@ func (bmc byLookupMetadataCache) SharedLabels() labels.Labels {
return nil
}

func TestIsCumulativeEquivalence(t *testing.T) {
mc := byLookupMetadataCache{
"counter": scrape.MetricMetadata{
Metric: "cr",
Type: textparse.MetricTypeCounter,
Help: "This is some help",
Unit: "By",
},
"gauge": scrape.MetricMetadata{
Metric: "ge",
Type: textparse.MetricTypeGauge,
Help: "This is some help",
Unit: "1",
},
"gaugehistogram": scrape.MetricMetadata{
Metric: "gh",
Type: textparse.MetricTypeGaugeHistogram,
Help: "This is some help",
Unit: "?",
},
"histogram": scrape.MetricMetadata{
Metric: "hg",
Type: textparse.MetricTypeHistogram,
Help: "This is some help",
Unit: "ms",
},
"summary": scrape.MetricMetadata{
Metric: "s",
Type: textparse.MetricTypeSummary,
Help: "This is some help",
Unit: "?",
},
"unknown": scrape.MetricMetadata{
Metric: "u",
Type: textparse.MetricTypeUnknown,
Help: "This is some help",
Unit: "?",
},
}
var mc = byLookupMetadataCache{
"counter": scrape.MetricMetadata{
Metric: "cr",
Type: textparse.MetricTypeCounter,
Help: "This is some help",
Unit: "By",
},
"gauge": scrape.MetricMetadata{
Metric: "ge",
Type: textparse.MetricTypeGauge,
Help: "This is some help",
Unit: "1",
},
"gaugehistogram": scrape.MetricMetadata{
Metric: "gh",
Type: textparse.MetricTypeGaugeHistogram,
Help: "This is some help",
Unit: "?",
},
"histogram": scrape.MetricMetadata{
Metric: "hg",
Type: textparse.MetricTypeHistogram,
Help: "This is some help",
Unit: "ms",
},
"summary": scrape.MetricMetadata{
Metric: "s",
Type: textparse.MetricTypeSummary,
Help: "This is some help",
Unit: "?",
},
"unknown": scrape.MetricMetadata{
Metric: "u",
Type: textparse.MetricTypeUnknown,
Help: "This is some help",
Unit: "?",
},
}

func TestIsCumulativeEquivalence(t *testing.T) {
tests := []struct {
name string
want bool
Expand All @@ -98,3 +102,147 @@ func TestIsCumulativeEquivalence(t *testing.T) {
})
}
}

func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
type scrape struct {
at int64
value float64
metric string
}
tests := []struct {
name string
labels labels.Labels
scrapes []*scrape
want func() pdata.HistogramDataPoint
}{
{
name: "histogram",
labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "le", Value: "0.75"}, {Name: "b", Value: "B"}},
scrapes: []*scrape{
{at: 11, value: 10, metric: "histogram_count"},
{at: 11, value: 1004.78, metric: "histogram_sum"},
{at: 13, value: 33.7, metric: "value"},
},
want: func() pdata.HistogramDataPoint {
point := pdata.NewHistogramDataPoint()
point.SetCount(10)
point.SetSum(1004.78)
point.SetTimestamp(11 * 1e6) // the time in milliseconds -> nanoseconds.
point.SetBucketCounts([]uint64{33})
point.SetExplicitBounds([]float64{})
point.SetStartTimestamp(11 * 1e6)
labelsMap := point.LabelsMap()
labelsMap.Insert("a", "A")
labelsMap.Insert("b", "B")
return point
},
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
}

require.Equal(t, 1, len(mp.groups), "Expecting exactly 1 groupKey")
groupKey := mp.getGroupKey(tt.labels.Copy())
require.NotNil(t, mp.groups[groupKey], "Expecting the groupKey to have a value given key:: "+groupKey)

hdpL := pdata.NewHistogramDataPointSlice()
require.True(t, mp.groups[groupKey].toDistributionPoint(mp.labelKeysOrdered, &hdpL))
require.Equal(t, 1, hdpL.Len(), "Exactly one point expected")
got := hdpL.At(0)
want := tt.want()
require.Equal(t, want, got, "Expected the points to be equal")
})
}
}

func TestMetricGroupData_toDistributionPointEquivalence(t *testing.T) {
type scrape struct {
at int64
value float64
metric string
}
tests := []struct {
name string
labels labels.Labels
scrapes []*scrape
}{
{
name: "histogram",
labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "le", Value: "0.75"}, {Name: "b", Value: "B"}},
scrapes: []*scrape{
{at: 11, value: 10, metric: "histogram_count"},
{at: 11, value: 1004.78, metric: "histogram_sum"},
{at: 13, value: 33.7, metric: "value"},
},
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mf := newMetricFamily(tt.name, mc, zap.NewNop()).(*metricFamily)
mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
}
groupKey := mf.getGroupKey(tt.labels.Copy())
ocTimeseries := mf.groups[groupKey].toDistributionTimeSeries(mf.labelKeysOrdered)
hdpL := pdata.NewHistogramDataPointSlice()
require.True(t, mp.groups[groupKey].toDistributionPoint(mp.labelKeysOrdered, &hdpL))
require.Equal(t, len(ocTimeseries.Points), hdpL.Len(), "They should have the exact same number of points")
require.Equal(t, 1, hdpL.Len(), "Exactly one point expected")
ocPoint := ocTimeseries.Points[0]
pdataPoint := hdpL.At(0)
// 1. Ensure that the startTimestamps are equal.
require.Equal(t, ocTimeseries.GetStartTimestamp().AsTime(), pdataPoint.Timestamp().AsTime(), "The timestamp must be equal")
// 2. Ensure that the count is equal.
ocHistogram := ocPoint.GetDistributionValue()
require.Equal(t, ocHistogram.GetCount(), int64(pdataPoint.Count()), "Count must be equal")
// 3. Ensure that the sum is equal.
require.Equal(t, ocHistogram.GetSum(), pdataPoint.Sum(), "Sum must be equal")
// 4. Ensure that the point's timestamp is equal to that from the OpenCensusProto data point.
require.Equal(t, ocPoint.GetTimestamp().AsTime(), pdataPoint.Timestamp().AsTime(), "Point timestamps must be equal")
// 5. Ensure that bucket bounds are the same.
require.Equal(t, len(ocHistogram.GetBuckets()), len(pdataPoint.BucketCounts()), "Bucket counts must have the same length")
var ocBucketCounts []uint64
for i, bucket := range ocHistogram.GetBuckets() {
ocBucketCounts = append(ocBucketCounts, uint64(bucket.GetCount()))

// 6. Ensure that the exemplars match.
ocExemplar := bucket.Exemplar
if ocExemplar == nil {
if i >= pdataPoint.Exemplars().Len() { // Both have the exact same number of exemplars.
continue
}
// Otherwise an exemplar is present for the pdata data point but not for the OpenCensus Proto histogram.
t.Fatalf("Exemplar #%d is ONLY present in the pdata point but not in the OpenCensus Proto histogram", i)
}
pdataExemplar := pdataPoint.Exemplars().At(i)
msgPrefix := fmt.Sprintf("Exemplar #%d:: ", i)
require.Equal(t, ocExemplar.Timestamp.AsTime(), pdataExemplar.Timestamp().AsTime(), msgPrefix+"timestamp mismatch")
require.Equal(t, ocExemplar.Value, pdataExemplar.Value(), msgPrefix+"value mismatch")
pdataExemplarAttachments := make(map[string]string)
pdataExemplar.FilteredLabels().Range(func(key, value string) bool {
pdataExemplarAttachments[key] = value
return true
})
require.Equal(t, ocExemplar.Attachments, pdataExemplarAttachments, msgPrefix+"attachments mismatch")
}
// 7. Ensure that bucket bounds are the same.
require.Equal(t, ocBucketCounts, pdataPoint.BucketCounts(), "Bucket counts must be equal")
// 8. Ensure that the labels all match up.
ocStringMap := pdata.NewStringMap()
for i, labelValue := range ocTimeseries.LabelValues {
ocStringMap.Insert(mf.labelKeysOrdered[i], labelValue.Value)
}
require.Equal(t, ocStringMap.Sort(), pdataPoint.LabelsMap().Sort())
})
}
}

0 comments on commit b34aa37

Please sign in to comment.