Skip to content

Commit

Permalink
Add support for separating unknown type
Browse files Browse the repository at this point in the history
  • Loading branch information
khanhntd committed Dec 21, 2022
1 parent 47db6ba commit a610220
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 48 deletions.
76 changes: 42 additions & 34 deletions receiver/prometheusreceiver/internal/metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,26 @@ import (
"sort"
"strings"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/scrape"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
)

const (
traceIDKey = "trace_id"
spanIDKey = "span_id"
traceIDKey = "trace_id"
spanIDKey = "span_id"
PromMetricType = "prom_metric_type"
)

type metricFamily struct {
mtype pmetric.MetricType
ptype textparse.MetricType
// isMonotonic only applies to sums
isMonotonic bool
groups map[uint64]*metricGroup
Expand All @@ -50,7 +52,6 @@ type metricFamily struct {
// a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for
// simple types like counter and gauge, each data point is a group of itself
type metricGroup struct {
mtype pmetric.MetricType
ts int64
ls labels.Labels
count float64
Expand All @@ -71,6 +72,7 @@ func newMetricFamily(metricName string, mc scrape.MetricMetadataStore, logger *z

return &metricFamily{
mtype: mtype,
ptype: metadata.Type,
isMonotonic: isMonotonic,
groups: make(map[uint64]*metricGroup),
name: familyName,
Expand All @@ -95,7 +97,7 @@ func (mg *metricGroup) sortPoints() {
})
}

func (mg *metricGroup) toDistributionPoint(dest pmetric.HistogramDataPointSlice) {
func (mg *metricGroup) toDistributionPoint(dest pmetric.HistogramDataPointSlice, mType pmetric.MetricType, pType textparse.MetricType) {
if !mg.hasCount || len(mg.complexValue) == 0 {
return
}
Expand Down Expand Up @@ -143,7 +145,7 @@ func (mg *metricGroup) toDistributionPoint(dest pmetric.HistogramDataPointSlice)
tsNanos := timestampFromMs(mg.ts)
point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
point.SetTimestamp(tsNanos)
populateAttributes(pmetric.MetricTypeHistogram, mg.ls, point.Attributes())
populateAttributes(mType, pType, mg.ls, point.Attributes())
mg.setExemplars(point.Exemplars())
}

Expand All @@ -156,7 +158,7 @@ func (mg *metricGroup) setExemplars(exemplars pmetric.ExemplarSlice) {
}
}

func (mg *metricGroup) toSummaryPoint(dest pmetric.SummaryDataPointSlice) {
func (mg *metricGroup) toSummaryPoint(dest pmetric.SummaryDataPointSlice, mType pmetric.MetricType, pType textparse.MetricType) {
// expecting count to be provided, however, in the following two cases, they can be missed.
// 1. data is corrupted
// 2. ignored by startValue evaluation
Expand Down Expand Up @@ -197,50 +199,56 @@ func (mg *metricGroup) toSummaryPoint(dest pmetric.SummaryDataPointSlice) {
tsNanos := timestampFromMs(mg.ts)
point.SetTimestamp(tsNanos)
point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
populateAttributes(pmetric.MetricTypeSummary, mg.ls, point.Attributes())
populateAttributes(mType, pType, mg.ls, point.Attributes())
}

func (mg *metricGroup) toNumberDataPoint(dest pmetric.NumberDataPointSlice) {
func (mg *metricGroup) toNumberDataPoint(dest pmetric.NumberDataPointSlice, mType pmetric.MetricType, pType textparse.MetricType) {
tsNanos := timestampFromMs(mg.ts)
point := dest.AppendEmpty()
// gauge/undefined types have no start time.
if mg.mtype == pmetric.MetricTypeSum {

// StartTimeUnixNano to indicate the start of an unbroken sequence of points means it can also be used to encode implicit gaps in the stream
// https://opentelemetry.io/docs/reference/specification/metrics/data-model/#temporality
// Only Sum, Summary, Histogram is need to set start time stamp
if mType == pmetric.MetricTypeSum {
point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
}

point.SetTimestamp(tsNanos)

if value.IsStaleNaN(mg.value) {
point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
} else {
point.SetDoubleValue(mg.value)
}
populateAttributes(pmetric.MetricTypeGauge, mg.ls, point.Attributes())

datapointAttributes := point.Attributes()

populateAttributes(mType, pType, mg.ls, datapointAttributes)
mg.setExemplars(point.Exemplars())
}

func populateAttributes(mType pmetric.MetricType, ls labels.Labels, dest pcommon.Map) {
dest.EnsureCapacity(ls.Len())
names := getSortedNotUsefulLabels(mType)
j := 0
for i := range ls {
for j < len(names) && names[j] < ls[i].Name {
j++
}
if j < len(names) && ls[i].Name == names[j] {
continue
}
if ls[i].Value == "" {
// empty label values should be omitted
continue
}
dest.PutStr(ls[i].Name, ls[i].Value)
func populateAttributes(mType pmetric.MetricType, pType textparse.MetricType, ls labels.Labels, attributes pcommon.Map) {
attributes.EnsureCapacity(ls.Len() + 1)
// Remove unuseful Prometheus Labels since prometheus receiver added OTLP attributes
// during initial tracsaction
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/08cd036e9da7fa0678402fe8ce1d4d7bd3234a6a/receiver/prometheusreceiver/internal/transaction.go#L233
notUsefulLabels := getSortedNotUsefulLabels(mType)
finalLabels := ls.MatchLabels(false, notUsefulLabels...).WithoutEmpty()
for i := range finalLabels {
attributes.PutStr(finalLabels[i].Name, finalLabels[i].Value)
}
// Preserve the original Prometheus Type as an datapoint attributes
// (e.g Unknown is convert to Gauge )
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/4eaa9f8f8c89492e4df873bda483ea49da8b194f/receiver/prometheusreceiver/internal/util.go#L105-L127
if pType == textparse.MetricTypeUnknown {
attributes.PutStr(PromMetricType, string(pType))
}
}

func (mf *metricFamily) loadMetricGroupOrCreate(groupKey uint64, ls labels.Labels, ts int64) *metricGroup {
mg, ok := mf.groups[groupKey]
if !ok {
mg = &metricGroup{
mtype: mf.mtype,
ts: ts,
ls: ls,
exemplars: pmetric.NewExemplarSlice(),
Expand Down Expand Up @@ -297,15 +305,15 @@ func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice, normalizer *pr
histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
hdpL := histogram.DataPoints()
for _, mg := range mf.groupOrders {
mg.toDistributionPoint(hdpL)
mg.toDistributionPoint(hdpL, mf.mtype, mf.ptype)
}
pointCount = hdpL.Len()

case pmetric.MetricTypeSummary:
summary := metric.SetEmptySummary()
sdpL := summary.DataPoints()
for _, mg := range mf.groupOrders {
mg.toSummaryPoint(sdpL)
mg.toSummaryPoint(sdpL, mf.mtype, mf.ptype)
}
pointCount = sdpL.Len()

Expand All @@ -315,15 +323,15 @@ func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice, normalizer *pr
sum.SetIsMonotonic(mf.isMonotonic)
sdpL := sum.DataPoints()
for _, mg := range mf.groupOrders {
mg.toNumberDataPoint(sdpL)
mg.toNumberDataPoint(sdpL, mf.mtype, mf.ptype)
}
pointCount = sdpL.Len()

default: // Everything else should be set to a Gauge.
gauge := metric.SetEmptyGauge()
gdpL := gauge.DataPoints()
for _, mg := range mf.groupOrders {
mg.toNumberDataPoint(gdpL)
mg.toNumberDataPoint(gdpL, mf.mtype, mf.ptype)
}
pointCount = gdpL.Len()
}
Expand Down
9 changes: 4 additions & 5 deletions receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sort"

prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
Expand All @@ -35,8 +36,6 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"

prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
)

const (
Expand Down Expand Up @@ -199,14 +198,14 @@ func (t *transaction) getSeriesRef(ls labels.Labels, mtype pmetric.MetricType) u

// getMetrics returns all metrics to the given slice.
// The only error returned by this function is errNoDataToBuild.
func (t *transaction) getMetrics(resource pcommon.Resource) (pmetric.Metrics, error) {
func (t *transaction) getMetrics() (pmetric.Metrics, error) {
if len(t.families) == 0 {
return pmetric.Metrics{}, errNoDataToBuild
}

md := pmetric.NewMetrics()
rms := md.ResourceMetrics().AppendEmpty()
resource.CopyTo(rms.Resource())
t.nodeResource.CopyTo(rms.Resource())
metrics := rms.ScopeMetrics().AppendEmpty().Metrics()

for _, mf := range t.families {
Expand Down Expand Up @@ -241,7 +240,7 @@ func (t *transaction) Commit() error {
}

ctx := t.obsrecv.StartMetricsOp(t.ctx)
md, err := t.getMetrics(t.nodeResource)
md, err := t.getMetrics()
if err != nil {
t.obsrecv.EndMetricsOp(ctx, dataformat, 0, err)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (
"math"
"testing"

"github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/model/value"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
)

var staleNaNsPage1 = `
Expand Down Expand Up @@ -287,7 +290,7 @@ func verifyNormalNaNs(t *testing.T, td *testData, resourceMetrics []pmetric.Reso
{
numberPointComparator: []numberPointComparator{
compareTimestamp(ts1),
compareAttributes(map[string]string{"name": "rough-snowflake-web", "port": "6380"}),
compareAttributes(map[string]string{"name": "rough-snowflake-web", "port": "6380", internal.PromMetricType: string(textparse.MetricTypeUnknown)}),
assertNormalNan(),
},
},
Expand Down Expand Up @@ -371,7 +374,7 @@ func verifyInfValues(t *testing.T, td *testData, resourceMetrics []pmetric.Resou
{
numberPointComparator: []numberPointComparator{
compareTimestamp(ts1),
compareAttributes(map[string]string{"name": "rough-snowflake-web", "port": "6380"}),
compareAttributes(map[string]string{"name": "rough-snowflake-web", "port": "6380", internal.PromMetricType: string(textparse.MetricTypeUnknown)}),
compareDoubleValue(math.Inf(-1)),
},
},
Expand Down
11 changes: 7 additions & 4 deletions receiver/prometheusreceiver/metrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import (

"github.com/prometheus/common/model"
promConfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/textparse"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pmetric"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
)

// Test data and validation functions for all four core metrics for Prometheus Receiver.
Expand Down Expand Up @@ -1463,14 +1466,14 @@ func verifyUntypedMetrics(t *testing.T, td *testData, resourceMetrics []pmetric.
numberPointComparator: []numberPointComparator{
compareTimestamp(ts1),
compareDoubleValue(100),
compareAttributes(map[string]string{"method": "post", "code": "200"}),
compareAttributes(map[string]string{"method": "post", "code": "200", internal.PromMetricType: string(textparse.MetricTypeUnknown)}),
},
},
{
numberPointComparator: []numberPointComparator{
compareTimestamp(ts1),
compareDoubleValue(5),
compareAttributes(map[string]string{"method": "post", "code": "400"}),
compareAttributes(map[string]string{"method": "post", "code": "400", internal.PromMetricType: string(textparse.MetricTypeUnknown)}),
},
},
}),
Expand All @@ -1481,14 +1484,14 @@ func verifyUntypedMetrics(t *testing.T, td *testData, resourceMetrics []pmetric.
numberPointComparator: []numberPointComparator{
compareTimestamp(ts1),
compareDoubleValue(10),
compareAttributes(map[string]string{"name": "rough-snowflake-web", "port": "6380"}),
compareAttributes(map[string]string{"name": "rough-snowflake-web", "port": "6380", internal.PromMetricType: string(textparse.MetricTypeUnknown)}),
},
},
{
numberPointComparator: []numberPointComparator{
compareTimestamp(ts1),
compareDoubleValue(12),
compareAttributes(map[string]string{"name": "rough-snowflake-web", "port": "6381"}),
compareAttributes(map[string]string{"name": "rough-snowflake-web", "port": "6381", internal.PromMetricType: string(textparse.MetricTypeUnknown)}),
},
},
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
"github.com/prometheus/common/model"
promcfg "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/textparse"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
)

var renameMetric = `
Expand Down Expand Up @@ -144,7 +147,7 @@ func verifyRenameMetric(t *testing.T, td *testData, resourceMetrics []pmetric.Re
numberPointComparator: []numberPointComparator{
compareTimestamp(ts1),
compareDoubleValue(15),
compareAttributes(map[string]string{"method": "post", "port": "6380"}),
compareAttributes(map[string]string{"method": "post", "port": "6380", internal.PromMetricType: string(textparse.MetricTypeUnknown)}),
},
},
}),
Expand All @@ -156,14 +159,14 @@ func verifyRenameMetric(t *testing.T, td *testData, resourceMetrics []pmetric.Re
numberPointComparator: []numberPointComparator{
compareTimestamp(ts1),
compareDoubleValue(10),
compareAttributes(map[string]string{"method": "post", "port": "6380"}),
compareAttributes(map[string]string{"method": "post", "port": "6380", internal.PromMetricType: string(textparse.MetricTypeUnknown)}),
},
},
{
numberPointComparator: []numberPointComparator{
compareTimestamp(ts1),
compareDoubleValue(12),
compareAttributes(map[string]string{"method": "post", "port": "6381"}),
compareAttributes(map[string]string{"method": "post", "port": "6381", internal.PromMetricType: string(textparse.MetricTypeUnknown)}),
},
},
}),
Expand Down

0 comments on commit a610220

Please sign in to comment.