Skip to content

Commit

Permalink
Add convertToTimeseries (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
connorlindsey committed Aug 13, 2020
1 parent 6fd38b9 commit b948cd3
Show file tree
Hide file tree
Showing 3 changed files with 497 additions and 4 deletions.
248 changes: 244 additions & 4 deletions exporters/metric/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ import (
"bytes"
"context"
"fmt"
"log"
"net/http"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/label"
apimetric "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/sdk/export/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
Expand All @@ -44,6 +47,26 @@ func (e *Exporter) ExportKindFor(*apimetric.Descriptor, aggregation.Kind) metric

// Export forwards metrics to Cortex from the SDK
func (e *Exporter) Export(_ context.Context, checkpointSet metric.CheckpointSet) error {
timeseries, err := e.ConvertToTimeSeries(checkpointSet)
if err != nil {
return err
}

message, buildMessageErr := e.buildMessage(timeseries)
if buildMessageErr != nil {
return buildMessageErr
}

request, buildRequestErr := e.buildRequest(message)
if buildRequestErr != nil {
return buildRequestErr
}

sendRequestErr := e.sendRequest(request)
if sendRequestErr != nil {
return sendRequestErr
}

return nil
}

Expand Down Expand Up @@ -86,11 +109,228 @@ func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller
return pusher, nil
}

// addHeaders adds required headers as well as all headers in Header map to a http
// request.
// ConvertToTimeSeries converts a CheckpointSet to a slice of TimeSeries pointers
func (e *Exporter) ConvertToTimeSeries(checkpointSet export.CheckpointSet) ([]*prompb.TimeSeries, error) {
var aggError error
var timeSeries []*prompb.TimeSeries

// Iterate over each record in the checkpoint set and convert to TimeSeries
aggError = checkpointSet.ForEach(e, func(record metric.Record) error {
// Convert based on aggregation type
agg := record.Aggregation()

// Check if aggregation has Sum value
if sum, ok := agg.(aggregation.Sum); ok {
tSeries, err := convertFromSum(record, sum)
if err != nil {
return err
}

timeSeries = append(timeSeries, tSeries)
}

// Check if aggregation has MinMaxSumCount value
if minMaxSumCount, ok := agg.(aggregation.MinMaxSumCount); ok {
tSeries, err := convertFromMinMaxSumCount(record, minMaxSumCount)
if err != nil {
return err
}

timeSeries = append(timeSeries, tSeries...)
} else if lastValue, ok := agg.(aggregation.LastValue); ok {
tSeries, err := convertFromLastValue(record, lastValue)
if err != nil {
return err
}

timeSeries = append(timeSeries, tSeries)
}

return nil
})

// Check if error was returned in checkpointSet.ForEach()
if aggError != nil {
return nil, aggError
}

return timeSeries, nil
}

// convertFromSum returns a single TimeSeries based on a Record with a Sum aggregation
func convertFromSum(record metric.Record, sum aggregation.Sum) (*prompb.TimeSeries, error) {
// Get Sum value
value, err := sum.Sum()
if err != nil {
return nil, err
}
// Create sample from Sum value
sample := prompb.Sample{
Value: value.CoerceToFloat64(record.Descriptor().NumberKind()),
Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64)
}

// Create labels, including metric name
name := sanitize(record.Descriptor().Name())
labels := createLabelSet(record, "__name__", name)

// Create TimeSeries and return
tSeries := &prompb.TimeSeries{
Samples: []prompb.Sample{sample},
Labels: labels,
}

return tSeries, nil
}

// convertFromLastValue returns a single TimeSeries based on a Record with a LastValue aggregation
func convertFromLastValue(record metric.Record, lastValue aggregation.LastValue) (*prompb.TimeSeries, error) {
// Get value
value, _, err := lastValue.LastValue()
if err != nil {
return nil, err
}

// Create sample from Last value
sample := prompb.Sample{
Value: value.CoerceToFloat64(record.Descriptor().NumberKind()),
Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64)
}

// Create labels, including metric name
name := sanitize(record.Descriptor().Name())
labels := createLabelSet(record, "__name__", name)

// Create TimeSeries and return
tSeries := &prompb.TimeSeries{
Samples: []prompb.Sample{sample},
Labels: labels,
}

return tSeries, nil
}

// convertFromMinMaxSumCount returns 4 TimeSeries for the min, max, sum, and count from the mmsc aggregation
func convertFromMinMaxSumCount(record metric.Record, minMaxSumCount aggregation.MinMaxSumCount) ([]*prompb.TimeSeries, error) {
// Convert Min
min, err := minMaxSumCount.Min()
if err != nil {
return nil, err
}
minSample := prompb.Sample{
Value: min.CoerceToFloat64(record.Descriptor().NumberKind()),
Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64)
}

// Create labels, including metric name
name := sanitize(record.Descriptor().Name() + "_min")
labels := createLabelSet(record, "__name__", name)

// Create TimeSeries
minTimeSeries := &prompb.TimeSeries{
Samples: []prompb.Sample{minSample},
Labels: labels,
}

// Convert Max
max, err := minMaxSumCount.Max()
if err != nil {
return nil, err
}
maxSample := prompb.Sample{
Value: max.CoerceToFloat64(record.Descriptor().NumberKind()),
Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64)
}

// Create labels, including metric name
name = sanitize(record.Descriptor().Name() + "_max")
labels = createLabelSet(record, "__name__", name)

// Create TimeSeries
maxTimeSeries := &prompb.TimeSeries{
Samples: []prompb.Sample{maxSample},
Labels: labels,
}

// Convert Count
count, err := minMaxSumCount.Count()
if err != nil {
return nil, err
}
countSample := prompb.Sample{
Value: float64(count),
Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64)
}

// Create labels, including metric name
name = sanitize(record.Descriptor().Name() + "_count")
labels = createLabelSet(record, "__name__", name)

// Create TimeSeries
countTimeSeries := &prompb.TimeSeries{
Samples: []prompb.Sample{countSample},
Labels: labels,
}

tSeries := []*prompb.TimeSeries{
minTimeSeries, maxTimeSeries, countTimeSeries,
}

return tSeries, nil
}

// createLabelSet combines labels from a Record, resource, and extra labels to
// create a slice of prompb.Label
func createLabelSet(record metric.Record, extras ...string) []*prompb.Label {
// Map ensure no duplicate label names
labelMap := map[string]prompb.Label{}

// mergeLabels merges Record and Resource labels into a single set, giving
// precedence to the record's labels.
mi := label.NewMergeIterator(record.Labels(), record.Resource().LabelSet())
for mi.Next() {
label := mi.Label()
key := string(label.Key)
labelMap[key] = prompb.Label{
Name: sanitize(key),
Value: label.Value.Emit(),
}
}

// Add extra labels created by the exporter like the metric name
// or labels to represent histogram buckets
for i := 0; i < len(extras); i += 2 {
// Ensure even number of extras (key : value)
if i+1 >= len(extras) {
break
}

// Ensure label doesn't exist. If it does, notify user that a user created label
// is being overwritten by a Prometheus reserved label (e.g. 'le' for histograms)
_, found := labelMap[extras[i]]
if found {
log.Printf("Label %s is overwritten. Check if Prometheus reserved labels are used.\n", extras[i])
}
labelMap[extras[i]] = prompb.Label{
Name: sanitize(extras[i]),
Value: extras[i+1],
}
}

// Create slice of labels from labelMap and return
res := make([]*prompb.Label, 0, len(labelMap))
for _, lb := range labelMap {
currentLabel := lb
res = append(res, &currentLabel)
}

return res
}

// AddHeaders adds required headers as well as all headers in Header map to a http request.
func (e *Exporter) addHeaders(req *http.Request) {
// Cortex expects Snappy-compressed protobuf messages. These three headers are
// hard-coded as they should be on every request.
// Cortex expects Snappy-compressed protobuf messages. These two headers are hard-coded as they
// should be on every request.
req.Header.Add("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
Expand Down

0 comments on commit b948cd3

Please sign in to comment.