Skip to content

Commit

Permalink
[receiver/prometheusreceiver] implement append native histogram (#28663)
Browse files Browse the repository at this point in the history
**Description:** 

Implement native histogram append MVP.
Very similar to appending a float sample.

Limitations:
- Only support integer counter histograms fully.
- In case a histogram has both classic and native buckets, we only store
one of them. Governed by scrape_classic_histograms scrape option. The
reason is that in the OTEL model the metric family is identified by the
normalized name (without _count, _sum, _bucket suffixes for the classic
histograms), meaning that the classic and native histograms would map to
the same metric family in OTEL model , but that cannot have both
Histogram and ExponentialHistogram types at the same time.
- Gauge histograms are dropped with warning as that temporality is
unsupported, see
open-telemetry/opentelemetry-specification#2714
- NoRecordedValue attribute might be unreliable. Prometheus scrape marks
all series with float NaN values when stale, but transactions in
prometheusreceiver are stateless, meaning that we have to use heuristics
to figure out if we need to add a NoRecordedValue data point to an
Exponential Histogram metric. (Need work in Prometheus.)



Additionally: 
- Created timestamp supported.
- Float counter histograms not fully tested and lose precision, but we
don't expect instrumentation to expose these anyway.

**Link to tracking Issue:**

Fixes: #26555 

**Testing:** 

Added unit tests and e2e tests.

**Documentation:**

TBD: will have to call out protobuf negotiation while no text format.
#27030

---------

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
Co-authored-by: David Ashpole <dashpole@google.com>
  • Loading branch information
krajorama and dashpole committed Apr 9, 2024
1 parent 17fe4f8 commit 13fca79
Show file tree
Hide file tree
Showing 16 changed files with 1,618 additions and 102 deletions.
36 changes: 36 additions & 0 deletions .chloggen/prometheusreceiver-append-native-histogram.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Allows receiving prometheus native histograms

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26555]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
- Native histograms are compatible with OTEL exponential histograms.
- The feature can be enabled via the feature gate `receiver.prometheusreceiver.EnableNativeHistograms`.
Run the collector with the command line option `--feature-gates=receiver.prometheusreceiver.EnableNativeHistograms`.
- Currently the feature also requires that targets are scraped via the ProtoBuf format.
To start scraping native histograms, set
`config.global.scrape_protocols` to `[ PrometheusProto, OpenMetricsText1.0.0, OpenMetricsText0.0.1, PrometheusText0.0.4 ]` in the
receiver configuration. This requirement will be lifted once Prometheus can scrape native histograms over text formats.
- For more up to date information see the README.md file of the receiver at
https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/prometheusreceiver/README.md#prometheus-native-histograms.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
23 changes: 22 additions & 1 deletion receiver/prometheusreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ prometheus --config.file=prom.yaml
"--feature-gates=receiver.prometheusreceiver.UseCreatedMetric"
```

- `receiver.prometheusreceiver.EnableNativeHistograms`: process and turn native histogram metrics into OpenTelemetry exponential histograms. For more details consult the [Prometheus native histograms](#prometheus-native-histograms) section.

```shell
"--feature-gates=receiver.prometheusreceiver.EnableNativeHistograms"
```

- `report_extra_scrape_metrics`: Extra Prometheus scrape metrics can be reported by setting this parameter to `true`

You can copy and paste that same configuration under:
Expand Down Expand Up @@ -123,7 +129,22 @@ receivers:
- targets: ['0.0.0.0:8888']
```

## OpenTelemetry Operator
## Prometheus native histograms

Native histograms are an experimental [feature](https://prometheus.io/docs/prometheus/latest/feature_flags/#native-histograms) of Prometheus.

To start scraping native histograms, set `config.global.scrape_protocols` to `[ PrometheusProto, OpenMetricsText1.0.0, OpenMetricsText0.0.1, PrometheusText0.0.4 ]`
in the receiver configuration. This requirement will be lifted once Prometheus can scrape native histograms over text formats.

To enable converting native histograms to OpenTelemetry exponential histograms, enable the feature gate `receiver.prometheusreceiver.EnableNativeHistograms`.
The feature is considered experimental.

This feature applies to the most common integer counter histograms, gauge histograms are dropped.
In case a metric has both the conventional (aka classic) buckets and also native histogram buckets, only the native histogram buckets will be
taken into account to create the corresponding exponential histogram. To scrape the classic buckets instead use the
[scrape option](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config) `scrape_classic_histograms`.

## OpenTelemetry Operator
Additional to this static job definitions this receiver allows to query a list of jobs from the
OpenTelemetryOperators TargetAllocator or a compatible endpoint.

Expand Down
8 changes: 8 additions & 0 deletions receiver/prometheusreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ var useCreatedMetricGate = featuregate.GlobalRegistry().MustRegister(
" retrieve the start time for Summary, Histogram and Sum metrics from _created metric"),
)

var enableNativeHistogramsGate = featuregate.GlobalRegistry().MustRegister(
"receiver.prometheusreceiver.EnableNativeHistograms",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, the Prometheus receiver will convert"+
" Prometheus native histograms to OTEL exponential histograms and ignore"+
" those Prometheus classic histograms that have a native histogram alternative"),
)

// NewFactory creates a new Prometheus receiver factory.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
Expand Down
33 changes: 18 additions & 15 deletions receiver/prometheusreceiver/internal/appendable.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ import (

// appendable translates Prometheus scraping diffs into OpenTelemetry format.
type appendable struct {
sink consumer.Metrics
metricAdjuster MetricsAdjuster
useStartTimeMetric bool
trimSuffixes bool
startTimeMetricRegex *regexp.Regexp
externalLabels labels.Labels
sink consumer.Metrics
metricAdjuster MetricsAdjuster
useStartTimeMetric bool
enableNativeHistograms bool
trimSuffixes bool
startTimeMetricRegex *regexp.Regexp
externalLabels labels.Labels

settings receiver.CreateSettings
obsrecv *receiverhelper.ObsReport
Expand All @@ -36,6 +37,7 @@ func NewAppendable(
useStartTimeMetric bool,
startTimeMetricRegex *regexp.Regexp,
useCreatedMetric bool,
enableNativeHistograms bool,
externalLabels labels.Labels,
trimSuffixes bool) (storage.Appendable, error) {
var metricAdjuster MetricsAdjuster
Expand All @@ -51,17 +53,18 @@ func NewAppendable(
}

return &appendable{
sink: sink,
settings: set,
metricAdjuster: metricAdjuster,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: startTimeMetricRegex,
externalLabels: externalLabels,
obsrecv: obsrecv,
trimSuffixes: trimSuffixes,
sink: sink,
settings: set,
metricAdjuster: metricAdjuster,
useStartTimeMetric: useStartTimeMetric,
enableNativeHistograms: enableNativeHistograms,
startTimeMetricRegex: startTimeMetricRegex,
externalLabels: externalLabels,
obsrecv: obsrecv,
trimSuffixes: trimSuffixes,
}, nil
}

func (o *appendable) Appender(ctx context.Context) storage.Appender {
return newTransaction(ctx, o.metricAdjuster, o.sink, o.externalLabels, o.settings, o.obsrecv, o.trimSuffixes)
return newTransaction(ctx, o.metricAdjuster, o.sink, o.externalLabels, o.settings, o.obsrecv, o.trimSuffixes, o.enableNativeHistograms)
}
163 changes: 161 additions & 2 deletions receiver/prometheusreceiver/internal/metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"

"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/scrape"
Expand Down Expand Up @@ -49,6 +50,8 @@ type metricGroup struct {
hasSum bool
created float64
value float64
hValue *histogram.Histogram
fhValue *histogram.FloatHistogram
complexValue []*dataPoint
exemplars pmetric.ExemplarSlice
}
Expand Down Expand Up @@ -156,6 +159,118 @@ func (mg *metricGroup) toDistributionPoint(dest pmetric.HistogramDataPointSlice)
mg.setExemplars(point.Exemplars())
}

// toExponentialHistogramDataPoints is based on
// https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#exponential-histograms
func (mg *metricGroup) toExponentialHistogramDataPoints(dest pmetric.ExponentialHistogramDataPointSlice) {
if !mg.hasCount {
return
}
point := dest.AppendEmpty()
point.SetTimestamp(timestampFromMs(mg.ts))

// We do not set Min or Max as native histograms don't have that information.
switch {
case mg.fhValue != nil:
fh := mg.fhValue

if value.IsStaleNaN(fh.Sum) {
point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
// The count and sum are initialized to 0, so we don't need to set them.
} else {
point.SetScale(fh.Schema)
// Input is a float native histogram. This conversion will lose
// precision,but we don't actually expect float histograms in scrape,
// since these are typically the result of operations on integer
// native histograms in the database.
point.SetCount(uint64(fh.Count))
point.SetSum(fh.Sum)
point.SetZeroThreshold(fh.ZeroThreshold)
point.SetZeroCount(uint64(fh.ZeroCount))

if len(fh.PositiveSpans) > 0 {
point.Positive().SetOffset(fh.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
convertAbsoluteBuckets(fh.PositiveSpans, fh.PositiveBuckets, point.Positive().BucketCounts())
}
if len(fh.NegativeSpans) > 0 {
point.Negative().SetOffset(fh.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
convertAbsoluteBuckets(fh.NegativeSpans, fh.NegativeBuckets, point.Negative().BucketCounts())
}
}

case mg.hValue != nil:
h := mg.hValue

if value.IsStaleNaN(h.Sum) {
point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
// The count and sum are initialized to 0, so we don't need to set them.
} else {
point.SetScale(h.Schema)
point.SetCount(h.Count)
point.SetSum(h.Sum)
point.SetZeroThreshold(h.ZeroThreshold)
point.SetZeroCount(h.ZeroCount)

if len(h.PositiveSpans) > 0 {
point.Positive().SetOffset(h.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
convertDeltaBuckets(h.PositiveSpans, h.PositiveBuckets, point.Positive().BucketCounts())
}
if len(h.NegativeSpans) > 0 {
point.Negative().SetOffset(h.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
convertDeltaBuckets(h.NegativeSpans, h.NegativeBuckets, point.Negative().BucketCounts())
}
}

default:
// This should never happen.
return
}

tsNanos := timestampFromMs(mg.ts)
if mg.created != 0 {
point.SetStartTimestamp(timestampFromFloat64(mg.created))
} else {
// metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
point.SetStartTimestamp(tsNanos)
}
point.SetTimestamp(tsNanos)
populateAttributes(pmetric.MetricTypeHistogram, mg.ls, point.Attributes())
mg.setExemplars(point.Exemplars())
}

func convertDeltaBuckets(spans []histogram.Span, deltas []int64, buckets pcommon.UInt64Slice) {
buckets.EnsureCapacity(len(deltas))
bucketIdx := 0
bucketCount := int64(0)
for spanIdx, span := range spans {
if spanIdx > 0 {
for i := int32(0); i < span.Offset; i++ {
buckets.Append(uint64(0))
}
}
for i := uint32(0); i < span.Length; i++ {
bucketCount += deltas[bucketIdx]
bucketIdx++
buckets.Append(uint64(bucketCount))
}
}
}

func convertAbsoluteBuckets(spans []histogram.Span, counts []float64, buckets pcommon.UInt64Slice) {
buckets.EnsureCapacity(len(counts))
bucketIdx := 0
for spanIdx, span := range spans {
if spanIdx > 0 {
for i := int32(0); i < span.Offset; i++ {
buckets.Append(uint64(0))
}
}
for i := uint32(0); i < span.Length; i++ {
buckets.Append(uint64(counts[bucketIdx]))
bucketIdx++
}
}
}

func (mg *metricGroup) setExemplars(exemplars pmetric.ExemplarSlice) {
if mg == nil {
return
Expand Down Expand Up @@ -296,13 +411,17 @@ func (mf *metricFamily) addSeries(seriesRef uint64, metricName string, ls labels
}
mg.complexValue = append(mg.complexValue, &dataPoint{value: v, boundary: boundary})
}
case pmetric.MetricTypeExponentialHistogram:
if metricName == mf.metadata.Metric+metricSuffixCreated {
mg.created = v
}
case pmetric.MetricTypeSum:
if metricName == mf.metadata.Metric+metricSuffixCreated {
mg.created = v
} else {
mg.value = v
}
case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram:
case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge:
fallthrough
default:
mg.value = v
Expand All @@ -311,6 +430,37 @@ func (mf *metricFamily) addSeries(seriesRef uint64, metricName string, ls labels
return nil
}

func (mf *metricFamily) addExponentialHistogramSeries(seriesRef uint64, metricName string, ls labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) error {
mg := mf.loadMetricGroupOrCreate(seriesRef, ls, t)
if mg.ts != t {
return fmt.Errorf("inconsistent timestamps on metric points for metric %v", metricName)
}
if mg.mtype != pmetric.MetricTypeExponentialHistogram {
return fmt.Errorf("metric type mismatch for exponential histogram metric %v type %s", metricName, mg.mtype.String())
}
switch {
case fh != nil:
if mg.hValue != nil {
return fmt.Errorf("exponential histogram %v already has float counts", metricName)
}
mg.count = fh.Count
mg.sum = fh.Sum
mg.hasCount = true
mg.hasSum = true
mg.fhValue = fh
case h != nil:
if mg.fhValue != nil {
return fmt.Errorf("exponential histogram %v already has integer counts", metricName)
}
mg.count = float64(h.Count)
mg.sum = h.Sum
mg.hasCount = true
mg.hasSum = true
mg.hValue = h
}
return nil
}

func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice, trimSuffixes bool) {
metric := pmetric.NewMetric()
// Trims type and unit suffixes from metric name
Expand Down Expand Up @@ -352,7 +502,16 @@ func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice, trimSuffixes b
}
pointCount = sdpL.Len()

case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram:
case pmetric.MetricTypeExponentialHistogram:
histogram := metric.SetEmptyExponentialHistogram()
histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
hdpL := histogram.DataPoints()
for _, mg := range mf.groupOrders {
mg.toExponentialHistogramDataPoints(hdpL)
}
pointCount = hdpL.Len()

case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge:
fallthrough
default: // Everything else should be set to a Gauge.
gauge := metric.SetEmptyGauge()
Expand Down
Loading

0 comments on commit 13fca79

Please sign in to comment.