From b948cd37086289ae5e1b19fde4304c049de40049 Mon Sep 17 00:00:00 2001 From: Connor Lindsey Date: Wed, 12 Aug 2020 22:19:38 -0600 Subject: [PATCH] Add convertToTimeseries (#211) --- exporters/metric/cortex/cortex.go | 248 ++++++++++++++++++++++- exporters/metric/cortex/cortex_test.go | 172 ++++++++++++++++ exporters/metric/cortex/testutil_test.go | 81 ++++++++ 3 files changed, 497 insertions(+), 4 deletions(-) create mode 100644 exporters/metric/cortex/testutil_test.go diff --git a/exporters/metric/cortex/cortex.go b/exporters/metric/cortex/cortex.go index 7a5189bc446..fa7f5f5e776 100644 --- a/exporters/metric/cortex/cortex.go +++ b/exporters/metric/cortex/cortex.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "fmt" + "log" "net/http" "github.com/gogo/protobuf/proto" @@ -25,8 +26,10 @@ import ( "github.com/prometheus/prometheus/prompb" "go.opentelemetry.io/otel/api/global" + "go.opentelemetry.io/otel/api/label" apimetric "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/sdk/export/metric" + export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/controller/push" "go.opentelemetry.io/otel/sdk/metric/selector/simple" @@ -44,6 +47,26 @@ func (e *Exporter) ExportKindFor(*apimetric.Descriptor, aggregation.Kind) metric // Export forwards metrics to Cortex from the SDK func (e *Exporter) Export(_ context.Context, checkpointSet metric.CheckpointSet) error { + timeseries, err := e.ConvertToTimeSeries(checkpointSet) + if err != nil { + return err + } + + message, buildMessageErr := e.buildMessage(timeseries) + if buildMessageErr != nil { + return buildMessageErr + } + + request, buildRequestErr := e.buildRequest(message) + if buildRequestErr != nil { + return buildRequestErr + } + + sendRequestErr := e.sendRequest(request) + if sendRequestErr != nil { + return sendRequestErr + } + return nil } @@ -86,11 +109,228 @@ func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller return pusher, nil } -// addHeaders adds required headers as well as all headers in Header map to a http -// request. +// ConvertToTimeSeries converts a CheckpointSet to a slice of TimeSeries pointers +func (e *Exporter) ConvertToTimeSeries(checkpointSet export.CheckpointSet) ([]*prompb.TimeSeries, error) { + var aggError error + var timeSeries []*prompb.TimeSeries + + // Iterate over each record in the checkpoint set and convert to TimeSeries + aggError = checkpointSet.ForEach(e, func(record metric.Record) error { + // Convert based on aggregation type + agg := record.Aggregation() + + // Check if aggregation has Sum value + if sum, ok := agg.(aggregation.Sum); ok { + tSeries, err := convertFromSum(record, sum) + if err != nil { + return err + } + + timeSeries = append(timeSeries, tSeries) + } + + // Check if aggregation has MinMaxSumCount value + if minMaxSumCount, ok := agg.(aggregation.MinMaxSumCount); ok { + tSeries, err := convertFromMinMaxSumCount(record, minMaxSumCount) + if err != nil { + return err + } + + timeSeries = append(timeSeries, tSeries...) + } else if lastValue, ok := agg.(aggregation.LastValue); ok { + tSeries, err := convertFromLastValue(record, lastValue) + if err != nil { + return err + } + + timeSeries = append(timeSeries, tSeries) + } + + return nil + }) + + // Check if error was returned in checkpointSet.ForEach() + if aggError != nil { + return nil, aggError + } + + return timeSeries, nil +} + +// convertFromSum returns a single TimeSeries based on a Record with a Sum aggregation +func convertFromSum(record metric.Record, sum aggregation.Sum) (*prompb.TimeSeries, error) { + // Get Sum value + value, err := sum.Sum() + if err != nil { + return nil, err + } + // Create sample from Sum value + sample := prompb.Sample{ + Value: value.CoerceToFloat64(record.Descriptor().NumberKind()), + Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64) + } + + // Create labels, including metric name + name := sanitize(record.Descriptor().Name()) + labels := createLabelSet(record, "__name__", name) + + // Create TimeSeries and return + tSeries := &prompb.TimeSeries{ + Samples: []prompb.Sample{sample}, + Labels: labels, + } + + return tSeries, nil +} + +// convertFromLastValue returns a single TimeSeries based on a Record with a LastValue aggregation +func convertFromLastValue(record metric.Record, lastValue aggregation.LastValue) (*prompb.TimeSeries, error) { + // Get value + value, _, err := lastValue.LastValue() + if err != nil { + return nil, err + } + + // Create sample from Last value + sample := prompb.Sample{ + Value: value.CoerceToFloat64(record.Descriptor().NumberKind()), + Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64) + } + + // Create labels, including metric name + name := sanitize(record.Descriptor().Name()) + labels := createLabelSet(record, "__name__", name) + + // Create TimeSeries and return + tSeries := &prompb.TimeSeries{ + Samples: []prompb.Sample{sample}, + Labels: labels, + } + + return tSeries, nil +} + +// convertFromMinMaxSumCount returns 4 TimeSeries for the min, max, sum, and count from the mmsc aggregation +func convertFromMinMaxSumCount(record metric.Record, minMaxSumCount aggregation.MinMaxSumCount) ([]*prompb.TimeSeries, error) { + // Convert Min + min, err := minMaxSumCount.Min() + if err != nil { + return nil, err + } + minSample := prompb.Sample{ + Value: min.CoerceToFloat64(record.Descriptor().NumberKind()), + Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64) + } + + // Create labels, including metric name + name := sanitize(record.Descriptor().Name() + "_min") + labels := createLabelSet(record, "__name__", name) + + // Create TimeSeries + minTimeSeries := &prompb.TimeSeries{ + Samples: []prompb.Sample{minSample}, + Labels: labels, + } + + // Convert Max + max, err := minMaxSumCount.Max() + if err != nil { + return nil, err + } + maxSample := prompb.Sample{ + Value: max.CoerceToFloat64(record.Descriptor().NumberKind()), + Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64) + } + + // Create labels, including metric name + name = sanitize(record.Descriptor().Name() + "_max") + labels = createLabelSet(record, "__name__", name) + + // Create TimeSeries + maxTimeSeries := &prompb.TimeSeries{ + Samples: []prompb.Sample{maxSample}, + Labels: labels, + } + + // Convert Count + count, err := minMaxSumCount.Count() + if err != nil { + return nil, err + } + countSample := prompb.Sample{ + Value: float64(count), + Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64) + } + + // Create labels, including metric name + name = sanitize(record.Descriptor().Name() + "_count") + labels = createLabelSet(record, "__name__", name) + + // Create TimeSeries + countTimeSeries := &prompb.TimeSeries{ + Samples: []prompb.Sample{countSample}, + Labels: labels, + } + + tSeries := []*prompb.TimeSeries{ + minTimeSeries, maxTimeSeries, countTimeSeries, + } + + return tSeries, nil +} + +// createLabelSet combines labels from a Record, resource, and extra labels to +// create a slice of prompb.Label +func createLabelSet(record metric.Record, extras ...string) []*prompb.Label { + // Map ensure no duplicate label names + labelMap := map[string]prompb.Label{} + + // mergeLabels merges Record and Resource labels into a single set, giving + // precedence to the record's labels. + mi := label.NewMergeIterator(record.Labels(), record.Resource().LabelSet()) + for mi.Next() { + label := mi.Label() + key := string(label.Key) + labelMap[key] = prompb.Label{ + Name: sanitize(key), + Value: label.Value.Emit(), + } + } + + // Add extra labels created by the exporter like the metric name + // or labels to represent histogram buckets + for i := 0; i < len(extras); i += 2 { + // Ensure even number of extras (key : value) + if i+1 >= len(extras) { + break + } + + // Ensure label doesn't exist. If it does, notify user that a user created label + // is being overwritten by a Prometheus reserved label (e.g. 'le' for histograms) + _, found := labelMap[extras[i]] + if found { + log.Printf("Label %s is overwritten. Check if Prometheus reserved labels are used.\n", extras[i]) + } + labelMap[extras[i]] = prompb.Label{ + Name: sanitize(extras[i]), + Value: extras[i+1], + } + } + + // Create slice of labels from labelMap and return + res := make([]*prompb.Label, 0, len(labelMap)) + for _, lb := range labelMap { + currentLabel := lb + res = append(res, ¤tLabel) + } + + return res +} + +// AddHeaders adds required headers as well as all headers in Header map to a http request. func (e *Exporter) addHeaders(req *http.Request) { - // Cortex expects Snappy-compressed protobuf messages. These three headers are - // hard-coded as they should be on every request. + // Cortex expects Snappy-compressed protobuf messages. These two headers are hard-coded as they + // should be on every request. req.Header.Add("X-Prometheus-Remote-Write-Version", "0.1.0") req.Header.Add("Content-Encoding", "snappy") req.Header.Set("Content-Type", "application/x-protobuf") diff --git a/exporters/metric/cortex/cortex_test.go b/exporters/metric/cortex/cortex_test.go index ecd5122a139..8f4e07ca025 100644 --- a/exporters/metric/cortex/cortex_test.go +++ b/exporters/metric/cortex/cortex_test.go @@ -27,11 +27,15 @@ import ( "github.com/golang/snappy" "github.com/google/go-cmp/cmp" "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/api/global" + "go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/sdk/export/metric" + export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" + "go.opentelemetry.io/otel/sdk/resource" ) // ValidConfig is a Config struct that should cause no errors. @@ -61,6 +65,9 @@ var validConfig = Config{ Client: http.DefaultClient, } +var testResource = resource.New(kv.String("R", "V")) +var mockTime int64 = time.Time{}.Unix() + func TestExportKindFor(t *testing.T) { exporter := Exporter{} got := exporter.ExportKindFor(nil, aggregation.Kind(0)) @@ -71,6 +78,171 @@ func TestExportKindFor(t *testing.T) { } } +func TestConvertToTimeSeries(t *testing.T) { + // Setup + exporter := Exporter{} + + // Test conversions based on aggregation type + tests := []struct { + name string + input export.CheckpointSet + want []*prompb.TimeSeries + wantLength int + }{ + { + name: "validCheckpointSet", + input: getValidCheckpointSet(t), + want: []*prompb.TimeSeries{ + { + Labels: []*prompb.Label{ + { + Name: "R", + Value: "V", + }, + { + Name: "__name__", + Value: "metric_name", + }, + }, + Samples: []prompb.Sample{{ + Value: 321, + Timestamp: mockTime, + }}, + }, + }, + wantLength: 1, + }, + { + name: "convertFromSum", + input: getSumCheckpoint(t, 321), + want: []*prompb.TimeSeries{ + { + Labels: []*prompb.Label{ + { + Name: "R", + Value: "V", + }, + { + Name: "__name__", + Value: "metric_name", + }, + }, + Samples: []prompb.Sample{{ + Value: 321, + Timestamp: mockTime, + }}, + }, + }, + wantLength: 1, + }, + { + name: "convertFromLastValue", + input: getLastValueCheckpoint(t, 123), + want: []*prompb.TimeSeries{ + { + Labels: []*prompb.Label{ + { + Name: "R", + Value: "V", + }, + { + Name: "__name__", + Value: "metric_name", + }, + }, + Samples: []prompb.Sample{{ + Value: 123, + Timestamp: mockTime, + }}, + }, + }, + wantLength: 1, + }, + { + name: "convertFromMinMaxSumCount", + input: getMMSCCheckpoint(t, 123.456, 876.543), + want: []*prompb.TimeSeries{ + { + Labels: []*prompb.Label{ + { + Name: "R", + Value: "V", + }, + { + Name: "__name__", + Value: "metric_name", + }, + }, + Samples: []prompb.Sample{{ + Value: 999.999, + Timestamp: mockTime, + }}, + }, + { + Labels: []*prompb.Label{ + { + Name: "R", + Value: "V", + }, + { + Name: "__name__", + Value: "metric_name_min", + }, + }, + Samples: []prompb.Sample{{ + Value: 123.456, + Timestamp: mockTime, + }}, + }, + { + Labels: []*prompb.Label{ + { + Name: "__name__", + Value: "metric_name_max", + }, + { + Name: "R", + Value: "V", + }, + }, + Samples: []prompb.Sample{{ + Value: 876.543, + Timestamp: mockTime, + }}, + }, + { + Labels: []*prompb.Label{ + { + Name: "R", + Value: "V", + }, + { + Name: "__name__", + Value: "metric_name_count", + }, + }, + Samples: []prompb.Sample{{ + Value: 2, + Timestamp: mockTime, + }}, + }, + }, + wantLength: 4, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := exporter.ConvertToTimeSeries(tt.input) + want := tt.want + + assert.Nil(t, err, "ConvertToTimeSeries error") + assert.Len(t, got, tt.wantLength, "Incorrect number of timeseries") + cmp.Equal(got, want) + }) + } +} + // TestNewRawExporter tests whether NewRawExporter successfully creates an Exporter with // the same Config struct as the one passed in. func TestNewRawExporter(t *testing.T) { diff --git a/exporters/metric/cortex/testutil_test.go b/exporters/metric/cortex/testutil_test.go new file mode 100644 index 00000000000..005d5022795 --- /dev/null +++ b/exporters/metric/cortex/testutil_test.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cortex + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/api/metric" + export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/metrictest" + "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" + "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" + "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" + "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" +) + +// getValidCheckpointSet returns a valid checkpointset with several records +func getValidCheckpointSet(t *testing.T) export.CheckpointSet { + return getSumCheckpoint(t, 321) +} + +// getSumCheckpoint returns a checkpoint set with a sum aggregation record +func getSumCheckpoint(t *testing.T, value int64) export.CheckpointSet { + // Create checkpoint set with resource and descriptor + checkpointSet := metrictest.NewCheckpointSet(testResource) + desc := metric.NewDescriptor("metric_name", metric.CounterKind, metric.Int64NumberKind) + + // Create aggregation, add value, and update checkpointset + agg, ckpt := metrictest.Unslice2(sum.New(2)) + aggregatortest.CheckedUpdate(t, agg, metric.NewInt64Number(value), &desc) + require.NoError(t, agg.SynchronizedMove(ckpt, &desc)) + checkpointSet.Add(&desc, ckpt) + + return checkpointSet +} + +// getLastValueCheckpoint returns a checkpoint set with a last value aggregation record +func getLastValueCheckpoint(t *testing.T, value int64) export.CheckpointSet { + // Create checkpoint set with resource and descriptor + checkpointSet := metrictest.NewCheckpointSet(testResource) + desc := metric.NewDescriptor("metric_name", metric.ValueObserverKind, metric.Int64NumberKind) + + // Create aggregation, add value, and update checkpointset + agg, ckpt := metrictest.Unslice2(lastvalue.New(2)) + aggregatortest.CheckedUpdate(t, agg, metric.NewInt64Number(value), &desc) + require.NoError(t, agg.SynchronizedMove(ckpt, &desc)) + checkpointSet.Add(&desc, ckpt) + + return checkpointSet +} + +// getMMSCCheckpoint returns a checkpoint set with a minmaxsumcount aggregation record +func getMMSCCheckpoint(t *testing.T, values ...float64) export.CheckpointSet { + // Create checkpoint set with resource and descriptor + checkpointSet := metrictest.NewCheckpointSet(testResource) + desc := metric.NewDescriptor("metric_name", metric.ValueRecorderKind, metric.Float64NumberKind) + + // Create aggregation, add value, and update checkpointset + agg, ckpt := metrictest.Unslice2(minmaxsumcount.New(2, &desc)) + for _, value := range values { + aggregatortest.CheckedUpdate(t, agg, metric.NewFloat64Number(value), &desc) + } + require.NoError(t, agg.SynchronizedMove(ckpt, &desc)) + checkpointSet.Add(&desc, ckpt) + + return checkpointSet +}