Skip to content

Commit

Permalink
Adds a new config StartTimeMetricRegex
Browse files Browse the repository at this point in the history
Previously the prometheus receiver only accepts
`process_start_time_metric` as the start time when UseStartTimeMetric is
set. For applications that export similar metrics but with a prefix,
e.g., via NewProcessCollector(namespace), the receiver would drop such
metrics.
By adding StartTimeMetricRegex, at least we will be able to cover such
use cases.
  • Loading branch information
Weil0ng committed Aug 7, 2020
1 parent 6b25af7 commit d1cdd6c
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 77 deletions.
1 change: 1 addition & 0 deletions receiver/prometheusreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
BufferPeriod time.Duration `mapstructure:"buffer_period"`
BufferCount int `mapstructure:"buffer_count"`
UseStartTimeMetric bool `mapstructure:"use_start_time_metric"`
StartTimeMetricRegex string `mapstructure:"start_time_metric_regex"`

// ConfigPlaceholder is just an entry to make the configuration pass a check
// that requires that all keys present in the config actually exist on the
Expand Down
1 change: 1 addition & 0 deletions receiver/prometheusreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(t, r1.PrometheusConfig.ScrapeConfigs[0].JobName, "demo")
assert.Equal(t, time.Duration(r1.PrometheusConfig.ScrapeConfigs[0].ScrapeInterval), 5*time.Second)
assert.Equal(t, r1.UseStartTimeMetric, true)
assert.Equal(t, r1.StartTimeMetricRegex, "^(.+_)*process_start_time_seconds$")
}

func TestLoadConfigWithEnvVar(t *testing.T) {
Expand Down
56 changes: 35 additions & 21 deletions receiver/prometheusreceiver/internal/metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package internal
import (
"errors"
"fmt"
"regexp"
"strconv"
"strings"

Expand Down Expand Up @@ -51,33 +52,46 @@ var (
)

type metricBuilder struct {
hasData bool
hasInternalMetric bool
mc MetadataCache
metrics []*metricspb.Metric
numTimeseries int
droppedTimeseries int
useStartTimeMetric bool
startTime float64
scrapeLatencyMs float64
scrapeStatus string
logger *zap.Logger
currentMf MetricFamily
hasData bool
hasInternalMetric bool
mc MetadataCache
metrics []*metricspb.Metric
numTimeseries int
droppedTimeseries int
useStartTimeMetric bool
startTimeMetricRegex *regexp.Regexp
startTime float64
scrapeLatencyMs float64
scrapeStatus string
logger *zap.Logger
currentMf MetricFamily
}

// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus
// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object
// by calling its Build function
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, logger *zap.Logger) *metricBuilder {

func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger) *metricBuilder {
var regex *regexp.Regexp
if startTimeMetricRegex != "" {
regex, _ = regexp.Compile(startTimeMetricRegex)
}
return &metricBuilder{
mc: mc,
metrics: make([]*metricspb.Metric, 0),
logger: logger,
numTimeseries: 0,
droppedTimeseries: 0,
useStartTimeMetric: useStartTimeMetric,
mc: mc,
metrics: make([]*metricspb.Metric, 0),
logger: logger,
numTimeseries: 0,
droppedTimeseries: 0,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: regex,
}
}

func (b *metricBuilder) matchStartTimeMetric(metricName string) bool {
if b.startTimeMetricRegex != nil {
return b.startTimeMetricRegex.MatchString(metricName)
}

return metricName == startTimeMetricName
}

// AddDataPoint is for feeding prometheus data complexValue in its processing order
Expand All @@ -103,7 +117,7 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) error
b.scrapeLatencyMs = v * 1000
}
return nil
} else if b.useStartTimeMetric && metricName == startTimeMetricName {
} else if b.useStartTimeMetric && b.matchStartTimeMetric(metricName) {
b.startTime = v
}

Expand Down
97 changes: 92 additions & 5 deletions receiver/prometheusreceiver/internal/metricsbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

const startTs = int64(1555366610000)
const interval = int64(15 * 1000)
const defaultBuilderStartTime = float64(1.0)

var testMetadata = map[string]scrape.MetricMetadata{
"counter_test": {Metric: "counter_test", Type: textparse.MetricTypeCounter, Help: "", Unit: ""},
Expand All @@ -44,6 +45,12 @@ var testMetadata = map[string]scrape.MetricMetadata{
"poor_name_count": {Metric: "poor_name_count", Type: textparse.MetricTypeCounter, Help: "", Unit: ""},
"up": {Metric: "up", Type: textparse.MetricTypeCounter, Help: "", Unit: ""},
"scrape_foo": {Metric: "scrape_foo", Type: textparse.MetricTypeCounter, Help: "", Unit: ""},
"example_process_start_time_seconds": {Metric: "example_process_start_time_seconds",
Type: textparse.MetricTypeGauge, Help: "", Unit: ""},
"process_start_time_seconds": {Metric: "process_start_time_seconds",
Type: textparse.MetricTypeGauge, Help: "", Unit: ""},
"badprocess_start_time_seconds": {Metric: "badprocess_start_time_seconds",
Type: textparse.MetricTypeGauge, Help: "", Unit: ""},
}

type testDataPoint struct {
Expand Down Expand Up @@ -90,8 +97,8 @@ func runBuilderTests(t *testing.T, tests []buildTestData) {
mc := newMockMetadataCache(testMetadata)
st := startTs
for i, page := range tt.inputs {
b := newMetricBuilder(mc, true, testLogger)
b.startTime = 1.0 // set to a non-zero value
b := newMetricBuilder(mc, true, "", testLogger)
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
pt.t = st
Expand All @@ -106,6 +113,86 @@ func runBuilderTests(t *testing.T, tests []buildTestData) {
}
}

func runBuilderStartTimeTests(t *testing.T, tests []buildTestData,
startTimeMetricRegex string, expectedBuilderStartTime float64) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
st := startTs
for _, page := range tt.inputs {
b := newMetricBuilder(mc, true, startTimeMetricRegex,
testLogger)
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
pt.t = st
assert.NoError(t, b.AddDataPoint(pt.lb, pt.t, pt.v))
}
_, _, _, err := b.Build()
assert.NoError(t, err)
assert.EqualValues(t, b.startTime, expectedBuilderStartTime)
st += interval
}
})
}
}

func Test_startTimeMetricMatch(t *testing.T) {
matchBuilderStartTime := float64(123.456)
matchTests := []buildTestData{
{
name: "prefix_match",
inputs: []*testScrapedPage{
{
pts: []*testDataPoint{
createDataPoint("example_process_start_time_seconds",
matchBuilderStartTime, "foo", "bar"),
},
},
},
},
{
name: "match",
inputs: []*testScrapedPage{
{
pts: []*testDataPoint{
createDataPoint("process_start_time_seconds",
matchBuilderStartTime, "foo", "bar"),
},
},
},
},
}
nomatchTests := []buildTestData{
{
name: "nomatch1",
inputs: []*testScrapedPage{
{
pts: []*testDataPoint{
createDataPoint("_process_start_time_seconds",
matchBuilderStartTime, "foo", "bar"),
},
},
},
},
{
name: "nomatch2",
inputs: []*testScrapedPage{
{
pts: []*testDataPoint{
createDataPoint("subprocess_start_time_seconds",
matchBuilderStartTime, "foo", "bar"),
},
},
},
},
}


runBuilderStartTimeTests(t, matchTests, "^(.+_)*process_start_time_seconds$", matchBuilderStartTime)
runBuilderStartTimeTests(t, nomatchTests, "^(.+_)*process_start_time_seconds$", defaultBuilderStartTime)
}

func Test_metricBuilder_counters(t *testing.T) {
tests := []buildTestData{
{
Expand Down Expand Up @@ -1055,7 +1142,7 @@ func Test_metricBuilder_skipped(t *testing.T) {
func Test_metricBuilder_baddata(t *testing.T) {
t.Run("empty-metric-name", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, testLogger)
b := newMetricBuilder(mc, true, "", testLogger)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(labels.FromStrings("a", "b"), startTs, 123); err != errMetricNameNotFound {
t.Error("expecting errMetricNameNotFound error, but get nil")
Expand All @@ -1069,7 +1156,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("histogram-datapoint-no-bucket-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, testLogger)
b := newMetricBuilder(mc, true, "", testLogger)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("hist_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand All @@ -1078,7 +1165,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("summary-datapoint-no-quantile-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, testLogger)
b := newMetricBuilder(mc, true, "", testLogger)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("summary_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand Down
40 changes: 21 additions & 19 deletions receiver/prometheusreceiver/internal/ocastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,30 @@ type OcaStore interface {

// OpenCensus Store for prometheus
type ocaStore struct {
running int32
logger *zap.Logger
sink consumer.MetricsConsumerOld
mc *mService
once *sync.Once
ctx context.Context
jobsMap *JobsMap
useStartTimeMetric bool
receiverName string
running int32
logger *zap.Logger
sink consumer.MetricsConsumerOld
mc *mService
once *sync.Once
ctx context.Context
jobsMap *JobsMap
useStartTimeMetric bool
startTimeMetricRegex string
receiverName string
}

// NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable
func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumerOld, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string) OcaStore {
func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumerOld, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverName string) OcaStore {
return &ocaStore{
running: runningStateInit,
ctx: ctx,
sink: sink,
logger: logger,
once: &sync.Once{},
jobsMap: jobsMap,
useStartTimeMetric: useStartTimeMetric,
receiverName: receiverName,
running: runningStateInit,
ctx: ctx,
sink: sink,
logger: logger,
once: &sync.Once{},
jobsMap: jobsMap,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: startTimeMetricRegex,
receiverName: receiverName,
}
}

Expand All @@ -83,7 +85,7 @@ func (o *ocaStore) SetScrapeManager(scrapeManager *scrape.Manager) {
func (o *ocaStore) Appender() storage.Appender {
state := atomic.LoadInt32(&o.running)
if state == runningStateReady {
return newTransaction(o.ctx, o.jobsMap, o.useStartTimeMetric, o.receiverName, o.mc, o.sink, o.logger)
return newTransaction(o.ctx, o.jobsMap, o.useStartTimeMetric, o.startTimeMetricRegex, o.receiverName, o.mc, o.sink, o.logger)
} else if state == runningStateInit {
panic("ScrapeManager is not set")
}
Expand Down
2 changes: 1 addition & 1 deletion receiver/prometheusreceiver/internal/ocastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

func TestOcaStore(t *testing.T) {

o := NewOcaStore(context.Background(), nil, nil, nil, false, "prometheus")
o := NewOcaStore(context.Background(), nil, nil, nil, false, "", "prometheus")
o.SetScrapeManager(&scrape.Manager{})

app := o.Appender()
Expand Down
50 changes: 26 additions & 24 deletions receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,34 @@ var errNoJobInstance = errors.New("job or instance cannot be found from labels")
// will be flush to the downstream consumer, or Rollback, which means discard all the data, is called and all data
// points are discarded.
type transaction struct {
id int64
ctx context.Context
isNew bool
sink consumer.MetricsConsumerOld
job string
instance string
jobsMap *JobsMap
useStartTimeMetric bool
receiverName string
ms MetadataService
node *commonpb.Node
metricBuilder *metricBuilder
logger *zap.Logger
id int64
ctx context.Context
isNew bool
sink consumer.MetricsConsumerOld
job string
instance string
jobsMap *JobsMap
useStartTimeMetric bool
startTimeMetricRegex string
receiverName string
ms MetadataService
node *commonpb.Node
metricBuilder *metricBuilder
logger *zap.Logger
}

func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string, ms MetadataService, sink consumer.MetricsConsumerOld, logger *zap.Logger) *transaction {
func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverName string, ms MetadataService, sink consumer.MetricsConsumerOld, logger *zap.Logger) *transaction {
return &transaction{
id: atomic.AddInt64(&idSeq, 1),
ctx: ctx,
isNew: true,
sink: sink,
jobsMap: jobsMap,
useStartTimeMetric: useStartTimeMetric,
receiverName: receiverName,
ms: ms,
logger: logger,
id: atomic.AddInt64(&idSeq, 1),
ctx: ctx,
isNew: true,
sink: sink,
jobsMap: jobsMap,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: startTimeMetricRegex,
receiverName: receiverName,
ms: ms,
logger: logger,
}
}

Expand Down Expand Up @@ -132,7 +134,7 @@ func (tr *transaction) initTransaction(ls labels.Labels) error {
tr.instance = instance
}
tr.node = createNode(job, instance, mc.SharedLabels().Get(model.SchemeLabel))
tr.metricBuilder = newMetricBuilder(mc, tr.useStartTimeMetric, tr.logger)
tr.metricBuilder = newMetricBuilder(mc, tr.useStartTimeMetric, tr.startTimeMetricRegex, tr.logger)
tr.isNew = false
return nil
}
Expand Down
Loading

0 comments on commit d1cdd6c

Please sign in to comment.