Skip to content

Commit

Permalink
receiver/prometheus: add metricGroup.toSummaryPoint pdata conversion
Browse files Browse the repository at this point in the history
Implements metricGroupPdata toSummaryPoint and added unit tests
as well as equivalence tests to ensure the migration will render
the same results.

Updates #3137
Depends on PR open-telemetry#3667
Updates PR open-telemetry#3427
  • Loading branch information
odeke-em committed Jul 18, 2021
1 parent f3ad1be commit 8444434
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 1 deletion.
33 changes: 33 additions & 0 deletions receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,39 @@ func (mg *metricGroupPdata) toDistributionPoint(orderedLabelKeys []string, dest
return true
}

func (mg *metricGroupPdata) toSummaryPoint(orderedLabelKeys []string, dest *pdata.SummaryDataPointSlice) bool {
// expecting count to be provided, however, in the following two cases, they can be missed.
// 1. data is corrupted
// 2. ignored by startValue evaluation
if !mg.hasCount {
return false
}

mg.sortPoints()

point := dest.AppendEmpty()
quantileValues := point.QuantileValues()
for _, p := range mg.complexValue {
quantile := quantileValues.AppendEmpty()
quantile.SetValue(p.value)
quantile.SetQuantile(p.boundary * 100)
}

// Based on the summary description from https://prometheus.io/docs/concepts/metric_types/#summary
// the quantiles are calculated over a sliding time window, however, the count is the total count of
// observations and the corresponding sum is a sum of all observed values, thus the sum and count used
// at the global level of the metricspb.SummaryValue
// The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
tsNanos := pdata.Timestamp(mg.ts * 1e6)
point.SetStartTimestamp(tsNanos)
point.SetTimestamp(tsNanos)
point.SetSum(mg.sum)
point.SetCount(uint64(mg.count))
populateLabelValuesPdata(orderedLabelKeys, mg.ls, point.LabelsMap())

return true
}

func populateLabelValuesPdata(orderedKeys []string, ls labels.Labels, dest pdata.StringMap) {
src := ls.Map()
for _, key := range orderedKeys {
Expand Down
201 changes: 200 additions & 1 deletion receiver/prometheusreceiver/internal/otlp_metricfamily_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var mc = byLookupMetadataCache{
Metric: "s",
Type: textparse.MetricTypeSummary,
Help: "This is some help",
Unit: "?",
Unit: "ms",
},
"unknown": scrape.MetricMetadata{
Metric: "u",
Expand Down Expand Up @@ -247,3 +247,202 @@ func TestMetricGroupData_toDistributionPointEquivalence(t *testing.T) {
})
}
}

func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
type scrape struct {
at int64
value float64
metric string
}

type labelsScrapes struct {
labels labels.Labels
scrapes []*scrape
}
tests := []struct {
name string
labelsScrapes []*labelsScrapes
want func() pdata.SummaryDataPoint
}{
{
name: "summary",
labelsScrapes: []*labelsScrapes{
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.0"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 10, value: 10, metric: "histogram_count"},
{at: 10, value: 12, metric: "histogram_sum"},
{at: 10, value: 8, metric: "value"},
},
},
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.75"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 11, value: 10, metric: "histogram_count"},
{at: 11, value: 1004.78, metric: "histogram_sum"},
{at: 11, value: 33.7, metric: "value"},
},
},
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.50"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 12, value: 10, metric: "histogram_count"},
{at: 12, value: 13, metric: "histogram_sum"},
{at: 12, value: 27, metric: "value"},
},
},
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.90"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 13, value: 10, metric: "histogram_count"},
{at: 13, value: 14, metric: "histogram_sum"},
{at: 13, value: 56, metric: "value"},
},
},
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.99"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 14, value: 10, metric: "histogram_count"},
{at: 14, value: 15, metric: "histogram_sum"},
{at: 14, value: 82, metric: "value"},
},
},
},
want: func() pdata.SummaryDataPoint {
point := pdata.NewSummaryDataPoint()
point.SetCount(10)
point.SetSum(15)
qtL := point.QuantileValues()
qn0 := qtL.AppendEmpty()
qn0.SetQuantile(0)
qn0.SetValue(8)
qn50 := qtL.AppendEmpty()
qn50.SetQuantile(50)
qn50.SetValue(27)
qn75 := qtL.AppendEmpty()
qn75.SetQuantile(75)
qn75.SetValue(33.7)
qn90 := qtL.AppendEmpty()
qn90.SetQuantile(90)
qn90.SetValue(56)
qn99 := qtL.AppendEmpty()
qn99.SetQuantile(99)
qn99.SetValue(82)
point.SetTimestamp(14 * 1e6) // the time in milliseconds -> nanoseconds.
point.SetStartTimestamp(14 * 1e6)
labelsMap := point.LabelsMap()
labelsMap.Insert("a", "A")
labelsMap.Insert("b", "B")
return point
},
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata)
for _, lbs := range tt.labelsScrapes {
for _, scrape := range lbs.scrapes {
require.NoError(t, mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value))
}
}

require.Equal(t, 1, len(mp.groups), "Expecting exactly 1 groupKey")
// Get the lone group key.
groupKey := ""
for key := range mp.groups {
groupKey = key
}
require.NotNil(t, mp.groups[groupKey], "Expecting the groupKey to have a value given key:: "+groupKey)

sdpL := pdata.NewSummaryDataPointSlice()
require.True(t, mp.groups[groupKey].toSummaryPoint(mp.labelKeysOrdered, &sdpL))
require.Equal(t, 1, sdpL.Len(), "Exactly one point expected")
got := sdpL.At(0)
want := tt.want()
require.Equal(t, want, got, "Expected the points to be equal")
})
}
}

func TestMetricGroupData_toSummaryPointEquivalence(t *testing.T) {
type scrape struct {
at int64
value float64
metric string
}
tests := []struct {
name string
labels labels.Labels
scrapes []*scrape
}{
{
name: "summary",
labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.75"}, {Name: "b", Value: "B"}},
scrapes: []*scrape{
{at: 11, value: 10, metric: "summary_count"},
{at: 11, value: 1004.78, metric: "summary_sum"},
{at: 13, value: 33.7, metric: "value"},
},
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mf := newMetricFamily(tt.name, mc, zap.NewNop()).(*metricFamily)
mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
}
groupKey := mf.getGroupKey(tt.labels.Copy())
ocTimeseries := mf.groups[groupKey].toSummaryTimeSeries(mf.labelKeysOrdered)
sdpL := pdata.NewSummaryDataPointSlice()
require.True(t, mp.groups[groupKey].toSummaryPoint(mp.labelKeysOrdered, &sdpL))
require.Equal(t, len(ocTimeseries.Points), sdpL.Len(), "They should have the exact same number of points")
require.Equal(t, 1, sdpL.Len(), "Exactly one point expected")
ocPoint := ocTimeseries.Points[0]
pdataPoint := sdpL.At(0)
// 1. Ensure that the startTimestamps are equal.
require.Equal(t, ocTimeseries.GetStartTimestamp().AsTime(), pdataPoint.Timestamp().AsTime(), "The timestamp must be equal")
// 2. Ensure that the count is equal.
ocSummary := ocPoint.GetSummaryValue()
if false {
t.Logf("\nOcSummary: %#v\nPdSummary: %#v\n\nocPoint: %#v\n", ocSummary, pdataPoint, ocPoint.GetSummaryValue())
return
}
require.Equal(t, ocSummary.GetCount().GetValue(), int64(pdataPoint.Count()), "Count must be equal")
// 3. Ensure that the sum is equal.
require.Equal(t, ocSummary.GetSum().GetValue(), pdataPoint.Sum(), "Sum must be equal")
// 4. Ensure that the point's timestamp is equal to that from the OpenCensusProto data point.
require.Equal(t, ocPoint.GetTimestamp().AsTime(), pdataPoint.Timestamp().AsTime(), "Point timestamps must be equal")
// 5. Ensure that the labels all match up.
ocStringMap := pdata.NewStringMap()
for i, labelValue := range ocTimeseries.LabelValues {
ocStringMap.Insert(mf.labelKeysOrdered[i], labelValue.Value)
}
require.Equal(t, ocStringMap.Sort(), pdataPoint.LabelsMap().Sort())
// 6. Ensure that the quantile values all match up.
ocQuantiles := ocSummary.GetSnapshot().GetPercentileValues()
pdataQuantiles := pdataPoint.QuantileValues()
require.Equal(t, len(ocQuantiles), pdataQuantiles.Len())
for i, ocQuantile := range ocQuantiles {
pdataQuantile := pdataQuantiles.At(i)
require.Equal(t, ocQuantile.Percentile, pdataQuantile.Quantile(), "The quantile percentiles must match")
require.Equal(t, ocQuantile.Value, pdataQuantile.Value(), "The quantile values must match")
}
})
}
}

0 comments on commit 8444434

Please sign in to comment.