Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to send underlying metric timestamps on prometheus output #11

Merged
merged 1 commit into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"net/http"
"sort"
"sync"
"time"

"go.opencensus.io/trace"

"github.com/golang/protobuf/ptypes"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
Expand All @@ -44,10 +46,11 @@ type Exporter struct {

// Options customizes a created Prometheus Exporter.
type Options struct {
Namespace string
OnError func(err error)
ConstLabels prometheus.Labels // ConstLabels will be set as labels on all views.
Registry *prometheus.Registry
Namespace string
OnError func(err error)
ConstLabels prometheus.Labels // ConstLabels will be set as labels on all views.
Registry *prometheus.Registry
SendTimestamps bool
}

// New is the constructor to make an Exporter with the defined Options.
Expand Down Expand Up @@ -237,7 +240,7 @@ func (c *collector) protoTimeSeriesToPrometheusMetrics(ctx context.Context, metr

pmetrics := make([]prometheus.Metric, 0, len(ts.Points))
for _, point := range ts.Points {
pmet, err := protoMetricToPrometheusMetric(ctx, point, desc, derivedPrometheusValueType, labelValues)
pmet, err := protoMetricToPrometheusMetric(ctx, point, desc, derivedPrometheusValueType, labelValues, c.opts.SendTimestamps)
if err == nil {
pmetrics = append(pmetrics, pmet)
} else {
Expand Down Expand Up @@ -277,7 +280,12 @@ func protoLabelKeysToLabels(protoLabelKeys []*metricspb.LabelKey) []string {
return labelKeys
}

func protoMetricToPrometheusMetric(ctx context.Context, point *metricspb.Point, desc *prometheus.Desc, derivedPrometheusType prometheus.ValueType, labelValues []string) (prometheus.Metric, error) {
func protoMetricToPrometheusMetric(ctx context.Context, point *metricspb.Point, desc *prometheus.Desc, derivedPrometheusType prometheus.ValueType, labelValues []string, sendTimestamps bool) (prometheus.Metric, error) {
timestamp, err := ptypes.Timestamp(point.Timestamp)
if err != nil {
timestamp = time.Now()
}

switch value := point.Value.(type) {
case *metricspb.Point_DistributionValue:
dValue := value.DistributionValue
Expand Down Expand Up @@ -308,14 +316,26 @@ func protoMetricToPrometheusMetric(ctx context.Context, point *metricspb.Point,
cumCount += countPerBucket
points[bucket] = cumCount
}
return prometheus.NewConstHistogram(desc, uint64(dValue.Count), dValue.Sum, points, labelValues...)
metric, err := prometheus.NewConstHistogram(desc, uint64(dValue.Count), dValue.Sum, points, labelValues...)
if err != nil || !sendTimestamps {
return metric, err
}
return prometheus.NewMetricWithTimestamp(timestamp, metric), nil

case *metricspb.Point_Int64Value:
// Derive the Prometheus
return prometheus.NewConstMetric(desc, derivedPrometheusType, float64(value.Int64Value), labelValues...)
metric, err := prometheus.NewConstMetric(desc, derivedPrometheusType, float64(value.Int64Value), labelValues...)
if err != nil || !sendTimestamps {
return metric, err
}
return prometheus.NewMetricWithTimestamp(timestamp, metric), nil

case *metricspb.Point_DoubleValue:
return prometheus.NewConstMetric(desc, derivedPrometheusType, value.DoubleValue, labelValues...)
metric, err := prometheus.NewConstMetric(desc, derivedPrometheusType, value.DoubleValue, labelValues...)
if err != nil || !sendTimestamps {
return metric, err
}
return prometheus.NewMetricWithTimestamp(timestamp, metric), nil

default:
return nil, fmt.Errorf("Unhandled type: %T", point.Value)
Expand Down
92 changes: 88 additions & 4 deletions prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ var (
Seconds: 1543160298,
Nanos: 100000997,
}
// before is a scrape that happened 5s earlier
startTimestampBefore = &timestamp.Timestamp{
Seconds: 1543160293,
Nanos: 100000090,
}
endTimestampBefore = &timestamp.Timestamp{
Seconds: 1543160293,
Nanos: 100000997,
}
)

func TestOnlyCumulativeWindowSupported(t *testing.T) {
Expand Down Expand Up @@ -303,31 +312,31 @@ func makeMetrics() []*metricspb.Metric {
},
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: startTimestamp,
StartTimestamp: startTimestampBefore,
LabelValues: []*metricspb.LabelValue{
{Value: "windows"},
{Value: "x86"},
{Value: "Storage"},
},
Points: []*metricspb.Point{
{
Timestamp: endTimestamp,
Timestamp: endTimestampBefore,
Value: &metricspb.Point_Int64Value{
Int64Value: 99,
},
},
},
},
{
StartTimestamp: startTimestamp,
StartTimestamp: startTimestampBefore,
LabelValues: []*metricspb.LabelValue{
{Value: "darwin"},
{Value: "386"},
{Value: "Ops"},
},
Points: []*metricspb.Point{
{
Timestamp: endTimestamp,
Timestamp: endTimestampBefore,
Value: &metricspb.Point_DoubleValue{
DoubleValue: 49.5,
},
Expand Down Expand Up @@ -443,3 +452,78 @@ with_metric_descriptor_count 2
t.Errorf("Mismatched output\nGot:\n%s\nWant:\n%s", g, w)
}
}

func TestMetricsEndpointWithTimestampOutput(t *testing.T) {
exp, err := New(Options{
SendTimestamps: true,
})
if err != nil {
t.Fatalf("Failed to create Prometheus exporter: %v", err)
}

srv := httptest.NewServer(exp)
defer srv.Close()

// Now record some metrics.
metrics := makeMetrics()
for _, metric := range metrics {
exp.ExportMetric(context.Background(), nil, nil, metric)
}

var i int
var output string
for {
time.Sleep(10 * time.Millisecond)
if i == 1000 {
t.Fatal("no output at / (10s wait)")
}
i++

resp, err := http.Get(srv.URL)
if err != nil {
t.Fatalf("Failed to get metrics on / error: %v", err)
}

slurp, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
t.Fatalf("Failed to read body: %v", err)
}

output = string(slurp)
if output != "" {
break
}
}

if strings.Contains(output, "collected before with the same name and label values") {
t.Fatalf("metric name and labels were duplicated but must be unique. Got\n\t%q", output)
}

if strings.Contains(output, "error(s) occurred") {
t.Fatalf("error reported by Prometheus registry:\n\t%s", output)
}

want := `# HELP a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_ Unlimited metric key lengths
# TYPE a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_ counter
a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_{arch="x86",keykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykey="",my_org_department="Storage",os="windows"} 99 1543160298100
# HELP this_one_there_where_ Extra ones
# TYPE this_one_there_where_ gauge
this_one_there_where_{arch="386",my_org_department="Ops",os="darwin"} 49.5 1543160293100
this_one_there_where_{arch="x86",my_org_department="Storage",os="windows"} 99 1543160293100
# HELP with_metric_descriptor This is a test
# TYPE with_metric_descriptor histogram
with_metric_descriptor_bucket{le="0"} 0 1543160298100
with_metric_descriptor_bucket{le="10"} 1 1543160298100
with_metric_descriptor_bucket{le="20"} 1 1543160298100
with_metric_descriptor_bucket{le="30"} 1 1543160298100
with_metric_descriptor_bucket{le="40"} 6 1543160298100
with_metric_descriptor_bucket{le="+Inf"} 2 1543160298100
with_metric_descriptor_sum 61.9 1543160298100
with_metric_descriptor_count 2 1543160298100
`

if g, w := output, want; g != w {
t.Errorf("Mismatched output\nGot:\n%s\nWant:\n%s", g, w)
}
}