Skip to content

Commit

Permalink
prometheusremotewrite: Hash labels using xxhash for performance
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Feb 28, 2024
1 parent a257d0f commit a8cf050
Show file tree
Hide file tree
Showing 18 changed files with 773 additions and 634 deletions.
27 changes: 27 additions & 0 deletions .chloggen/arve_xxhash.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewrite

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change prometheusremotewrite.FromMetrics's tsMap argument to have uint64 keys instead of strings, for more performant hashing.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31385]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
2 changes: 1 addition & 1 deletion exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) {
return sanitizedLabels, nil
}

func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []*prompb.MetricMetadata) error {
func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[uint64]*prompb.TimeSeries, m []*prompb.MetricMetadata) error {
// There are no metrics to export, so return.
if len(tsMap) == 0 {
return nil
Expand Down
10 changes: 5 additions & 5 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,9 @@ func TestNoMetricsNoError(t *testing.T) {

func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
// First we will construct a TimeSeries array from the testutils package
testmap := make(map[string]*prompb.TimeSeries)
testmap := make(map[uint64]*prompb.TimeSeries)
if ts != nil {
testmap["test"] = ts
testmap[1] = ts
}

cfg := createDefaultConfig().(*Config)
Expand Down Expand Up @@ -916,9 +916,9 @@ func TestWALOnExporterRoundTrip(t *testing.T) {
Labels: []prompb.Label{{Name: "ts2l1", Value: "ts2k1"}},
Samples: []prompb.Sample{{Value: 2, Timestamp: 200}},
}
tsMap := map[string]*prompb.TimeSeries{
"timeseries1": ts1,
"timeseries2": ts2,
tsMap := map[uint64]*prompb.TimeSeries{
1: ts1,
2: ts2,
}
errs := prwe.handleExport(ctx, tsMap, nil)
assert.NoError(t, errs)
Expand Down
2 changes: 1 addition & 1 deletion exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

// batchTimeSeries splits series into multiple batch write requests.
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata) ([]*prompb.WriteRequest, error) {
func batchTimeSeries(tsMap map[uint64]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata) ([]*prompb.WriteRequest, error) {
if len(tsMap) == 0 {
return nil, errors.New("invalid tsMap: cannot be empty map")
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/prometheusremotewriteexporter/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func Test_batchTimeSeries(t *testing.T) {

tests := []struct {
name string
tsMap map[string]*prompb.TimeSeries
tsMap map[uint64]*prompb.TimeSeries
maxBatchByteSize int
numExpectedRequests int
returnErr bool
Expand Down
7 changes: 3 additions & 4 deletions exporter/prometheusremotewriteexporter/testutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package prometheusremotewriteexporter

import (
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -384,10 +383,10 @@ func getQuantiles(bounds []float64, values []float64) pmetric.SummaryDataPointVa
return quantiles
}

func getTimeseriesMap(timeseries []*prompb.TimeSeries) map[string]*prompb.TimeSeries {
tsMap := make(map[string]*prompb.TimeSeries)
func getTimeseriesMap(timeseries []*prompb.TimeSeries) map[uint64]*prompb.TimeSeries {
tsMap := make(map[uint64]*prompb.TimeSeries)
for i, v := range timeseries {
tsMap[fmt.Sprintf("%s%d", "timeseries_name", i)] = v
tsMap[uint64(i+1)] = v
}
return tsMap
}
120 changes: 120 additions & 0 deletions pkg/translator/prometheusremotewrite/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package prometheusremotewrite

import (
"errors"
"fmt"

"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"

prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
)

type Settings struct {
Namespace string
ExternalLabels map[string]string
DisableTargetInfo bool
ExportCreatedMetric bool
AddMetricSuffixes bool
SendMetadata bool
}

type MetricsConverter interface {
// AddSample adds a sample and returns the metric hash.
AddSample(*prompb.Sample, []prompb.Label) uint64
// AddExemplars converts exemplars.
AddExemplars(uint64, pmetric.HistogramDataPoint, []bucketBoundsData)
// AddGaugeNumberDataPoints converts Gauge metric data points.
AddGaugeNumberDataPoints(pmetric.NumberDataPointSlice, pcommon.Resource, pmetric.Metric, Settings, string) error
// AddSumNumberDataPoints converts Sum metric data points.
AddSumNumberDataPoints(pmetric.NumberDataPointSlice, pcommon.Resource, pmetric.Metric, Settings, string) error
// AddSummaryDataPoints converts summary metric data points, each to len(QuantileValues) + 2 samples.
AddSummaryDataPoints(pmetric.SummaryDataPointSlice, pcommon.Resource, pmetric.Metric, Settings, string) error
// AddHistogramDataPoints converts histogram metric data points, each to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples.
// It ignores extra buckets if len(ExplicitBounds) > len(BucketCounts).
AddHistogramDataPoints(pmetric.HistogramDataPointSlice, pcommon.Resource, pmetric.Metric, Settings, string) error
// AddExponentialHistogramDataPoints converts exponential histogtram metric data points.
AddExponentialHistogramDataPoints(pmetric.ExponentialHistogramDataPointSlice, pcommon.Resource, Settings, string) error
AddMetricIfNeeded([]prompb.Label, pcommon.Timestamp, pcommon.Timestamp)
}

// FromMetrics converts pmetric.Metrics to another metrics format via the converter.
func FromMetrics(md pmetric.Metrics, settings Settings, converter MetricsConverter) (errs error) {
resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
resource := resourceMetrics.Resource()
scopeMetricsSlice := resourceMetrics.ScopeMetrics()
// keep track of the most recent timestamp in the ResourceMetrics for
// use with the "target" info metric
var mostRecentTimestamp pcommon.Timestamp
for j := 0; j < scopeMetricsSlice.Len(); j++ {
metricSlice := scopeMetricsSlice.At(j).Metrics()

// TODO: decide if instrumentation library information should be exported as labels
for k := 0; k < metricSlice.Len(); k++ {
metric := metricSlice.At(k)
mostRecentTimestamp = maxTimestamp(mostRecentTimestamp, mostRecentTimestampInMetric(metric))

if !isValidAggregationTemporality(metric) {
errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name()))
continue
}

promName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes)

// handle individual metrics based on type
//exhaustive:enforce
switch metric.Type() {
case pmetric.MetricTypeGauge:
dataPoints := metric.Gauge().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
errs = multierr.Append(errs, converter.AddGaugeNumberDataPoints(dataPoints, resource, metric, settings, promName))
case pmetric.MetricTypeSum:
dataPoints := metric.Sum().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
errs = multierr.Append(errs, converter.AddSumNumberDataPoints(dataPoints, resource, metric, settings, promName))
case pmetric.MetricTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
errs = multierr.Append(errs, converter.AddHistogramDataPoints(dataPoints, resource, metric, settings, promName))
case pmetric.MetricTypeExponentialHistogram:
dataPoints := metric.ExponentialHistogram().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
errs = multierr.Append(errs, converter.AddExponentialHistogramDataPoints(
dataPoints,
resource,
settings,
promName,
))
case pmetric.MetricTypeSummary:
dataPoints := metric.Summary().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
errs = multierr.Append(errs, converter.AddSummaryDataPoints(dataPoints, resource, metric, settings, promName))
default:
errs = multierr.Append(errs, errors.New("unsupported metric type"))
}
}
}
addResourceTargetInfo(resource, settings, mostRecentTimestamp, converter)
}

return
}
3 changes: 3 additions & 0 deletions pkg/translator/prometheusremotewrite/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/
go 1.21

require (
github.com/cespare/xxhash/v2 v2.2.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.95.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.95.0
github.com/prometheus/common v0.46.0
Expand All @@ -18,12 +19,14 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.opentelemetry.io/collector/featuregate v1.2.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions pkg/translator/prometheusremotewrite/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a8cf050

Please sign in to comment.