Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a new config StartTimeMetricRegex #1511

Merged
merged 1 commit into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions receiver/prometheusreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
BufferPeriod time.Duration `mapstructure:"buffer_period"`
BufferCount int `mapstructure:"buffer_count"`
UseStartTimeMetric bool `mapstructure:"use_start_time_metric"`
StartTimeMetricRegex string `mapstructure:"start_time_metric_regex"`

// ConfigPlaceholder is just an entry to make the configuration pass a check
// that requires that all keys present in the config actually exist on the
Expand Down
1 change: 1 addition & 0 deletions receiver/prometheusreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(t, r1.PrometheusConfig.ScrapeConfigs[0].JobName, "demo")
assert.Equal(t, time.Duration(r1.PrometheusConfig.ScrapeConfigs[0].ScrapeInterval), 5*time.Second)
assert.Equal(t, r1.UseStartTimeMetric, true)
assert.Equal(t, r1.StartTimeMetricRegex, "^(.+_)*process_start_time_seconds$")
}

func TestLoadConfigWithEnvVar(t *testing.T) {
Expand Down
56 changes: 35 additions & 21 deletions receiver/prometheusreceiver/internal/metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package internal
import (
"errors"
"fmt"
"regexp"
"strconv"
"strings"

Expand Down Expand Up @@ -51,33 +52,46 @@ var (
)

type metricBuilder struct {
hasData bool
hasInternalMetric bool
mc MetadataCache
metrics []*metricspb.Metric
numTimeseries int
droppedTimeseries int
useStartTimeMetric bool
startTime float64
scrapeLatencyMs float64
scrapeStatus string
logger *zap.Logger
currentMf MetricFamily
hasData bool
hasInternalMetric bool
mc MetadataCache
metrics []*metricspb.Metric
numTimeseries int
droppedTimeseries int
useStartTimeMetric bool
startTimeMetricRegex *regexp.Regexp
startTime float64
scrapeLatencyMs float64
scrapeStatus string
logger *zap.Logger
currentMf MetricFamily
}

// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus
// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object
// by calling its Build function
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, logger *zap.Logger) *metricBuilder {

func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger) *metricBuilder {
var regex *regexp.Regexp
if startTimeMetricRegex != "" {
regex, _ = regexp.Compile(startTimeMetricRegex)
}
return &metricBuilder{
mc: mc,
metrics: make([]*metricspb.Metric, 0),
logger: logger,
numTimeseries: 0,
droppedTimeseries: 0,
useStartTimeMetric: useStartTimeMetric,
mc: mc,
metrics: make([]*metricspb.Metric, 0),
logger: logger,
numTimeseries: 0,
droppedTimeseries: 0,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: regex,
}
}

func (b *metricBuilder) matchStartTimeMetric(metricName string) bool {
if b.startTimeMetricRegex != nil {
return b.startTimeMetricRegex.MatchString(metricName)
}

return metricName == startTimeMetricName
}

// AddDataPoint is for feeding prometheus data complexValue in its processing order
Expand All @@ -104,7 +118,7 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) error
b.scrapeLatencyMs = v * 1000
}
return nil
case b.useStartTimeMetric && metricName == startTimeMetricName:
case b.useStartTimeMetric && b.matchStartTimeMetric(metricName):
b.startTime = v
}

Expand Down
96 changes: 91 additions & 5 deletions receiver/prometheusreceiver/internal/metricsbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

const startTs = int64(1555366610000)
const interval = int64(15 * 1000)
const defaultBuilderStartTime = float64(1.0)

var testMetadata = map[string]scrape.MetricMetadata{
"counter_test": {Metric: "counter_test", Type: textparse.MetricTypeCounter, Help: "", Unit: ""},
Expand All @@ -44,6 +45,12 @@ var testMetadata = map[string]scrape.MetricMetadata{
"poor_name_count": {Metric: "poor_name_count", Type: textparse.MetricTypeCounter, Help: "", Unit: ""},
"up": {Metric: "up", Type: textparse.MetricTypeCounter, Help: "", Unit: ""},
"scrape_foo": {Metric: "scrape_foo", Type: textparse.MetricTypeCounter, Help: "", Unit: ""},
"example_process_start_time_seconds": {Metric: "example_process_start_time_seconds",
Type: textparse.MetricTypeGauge, Help: "", Unit: ""},
"process_start_time_seconds": {Metric: "process_start_time_seconds",
Type: textparse.MetricTypeGauge, Help: "", Unit: ""},
"badprocess_start_time_seconds": {Metric: "badprocess_start_time_seconds",
Type: textparse.MetricTypeGauge, Help: "", Unit: ""},
}

type testDataPoint struct {
Expand Down Expand Up @@ -90,8 +97,8 @@ func runBuilderTests(t *testing.T, tests []buildTestData) {
mc := newMockMetadataCache(testMetadata)
st := startTs
for i, page := range tt.inputs {
b := newMetricBuilder(mc, true, testLogger)
b.startTime = 1.0 // set to a non-zero value
b := newMetricBuilder(mc, true, "", testLogger)
Weil0ng marked this conversation as resolved.
Show resolved Hide resolved
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
pt.t = st
Expand All @@ -106,6 +113,85 @@ func runBuilderTests(t *testing.T, tests []buildTestData) {
}
}

func runBuilderStartTimeTests(t *testing.T, tests []buildTestData,
startTimeMetricRegex string, expectedBuilderStartTime float64) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
st := startTs
for _, page := range tt.inputs {
b := newMetricBuilder(mc, true, startTimeMetricRegex,
testLogger)
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
pt.t = st
assert.NoError(t, b.AddDataPoint(pt.lb, pt.t, pt.v))
}
_, _, _, err := b.Build()
assert.NoError(t, err)
assert.EqualValues(t, b.startTime, expectedBuilderStartTime)
st += interval
}
})
}
}

func Test_startTimeMetricMatch(t *testing.T) {
matchBuilderStartTime := float64(123.456)
matchTests := []buildTestData{
{
name: "prefix_match",
inputs: []*testScrapedPage{
{
pts: []*testDataPoint{
createDataPoint("example_process_start_time_seconds",
matchBuilderStartTime, "foo", "bar"),
},
},
},
},
{
name: "match",
inputs: []*testScrapedPage{
{
pts: []*testDataPoint{
createDataPoint("process_start_time_seconds",
matchBuilderStartTime, "foo", "bar"),
},
},
},
},
}
nomatchTests := []buildTestData{
{
name: "nomatch1",
inputs: []*testScrapedPage{
{
pts: []*testDataPoint{
createDataPoint("_process_start_time_seconds",
matchBuilderStartTime, "foo", "bar"),
},
},
},
},
{
name: "nomatch2",
inputs: []*testScrapedPage{
{
pts: []*testDataPoint{
createDataPoint("subprocess_start_time_seconds",
matchBuilderStartTime, "foo", "bar"),
},
},
},
},
}

runBuilderStartTimeTests(t, matchTests, "^(.+_)*process_start_time_seconds$", matchBuilderStartTime)
runBuilderStartTimeTests(t, nomatchTests, "^(.+_)*process_start_time_seconds$", defaultBuilderStartTime)
}

func Test_metricBuilder_counters(t *testing.T) {
tests := []buildTestData{
{
Expand Down Expand Up @@ -1055,7 +1141,7 @@ func Test_metricBuilder_skipped(t *testing.T) {
func Test_metricBuilder_baddata(t *testing.T) {
t.Run("empty-metric-name", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, testLogger)
b := newMetricBuilder(mc, true, "", testLogger)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(labels.FromStrings("a", "b"), startTs, 123); err != errMetricNameNotFound {
t.Error("expecting errMetricNameNotFound error, but get nil")
Expand All @@ -1069,7 +1155,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("histogram-datapoint-no-bucket-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, testLogger)
b := newMetricBuilder(mc, true, "", testLogger)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("hist_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand All @@ -1078,7 +1164,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("summary-datapoint-no-quantile-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, testLogger)
b := newMetricBuilder(mc, true, "", testLogger)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("summary_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand Down
40 changes: 21 additions & 19 deletions receiver/prometheusreceiver/internal/ocastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,30 @@ type OcaStore interface {

// OpenCensus Store for prometheus
type ocaStore struct {
running int32
logger *zap.Logger
sink consumer.MetricsConsumer
mc *mService
once *sync.Once
ctx context.Context
jobsMap *JobsMap
useStartTimeMetric bool
receiverName string
running int32
logger *zap.Logger
sink consumer.MetricsConsumer
mc *mService
once *sync.Once
ctx context.Context
jobsMap *JobsMap
useStartTimeMetric bool
startTimeMetricRegex string
receiverName string
}

// NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable
func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumer, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string) OcaStore {
func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumer, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverName string) OcaStore {
return &ocaStore{
running: runningStateInit,
ctx: ctx,
sink: sink,
logger: logger,
once: &sync.Once{},
jobsMap: jobsMap,
useStartTimeMetric: useStartTimeMetric,
receiverName: receiverName,
running: runningStateInit,
ctx: ctx,
sink: sink,
logger: logger,
once: &sync.Once{},
jobsMap: jobsMap,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: startTimeMetricRegex,
receiverName: receiverName,
}
}

Expand All @@ -83,7 +85,7 @@ func (o *ocaStore) SetScrapeManager(scrapeManager *scrape.Manager) {
func (o *ocaStore) Appender() storage.Appender {
state := atomic.LoadInt32(&o.running)
if state == runningStateReady {
return newTransaction(o.ctx, o.jobsMap, o.useStartTimeMetric, o.receiverName, o.mc, o.sink, o.logger)
return newTransaction(o.ctx, o.jobsMap, o.useStartTimeMetric, o.startTimeMetricRegex, o.receiverName, o.mc, o.sink, o.logger)
} else if state == runningStateInit {
panic("ScrapeManager is not set")
}
Expand Down
2 changes: 1 addition & 1 deletion receiver/prometheusreceiver/internal/ocastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

func TestOcaStore(t *testing.T) {

o := NewOcaStore(context.Background(), nil, nil, nil, false, "prometheus")
o := NewOcaStore(context.Background(), nil, nil, nil, false, "", "prometheus")
o.SetScrapeManager(&scrape.Manager{})

app := o.Appender()
Expand Down
50 changes: 26 additions & 24 deletions receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,32 +57,34 @@ var errNoJobInstance = errors.New("job or instance cannot be found from labels")
// will be flush to the downstream consumer, or Rollback, which means discard all the data, is called and all data
// points are discarded.
type transaction struct {
id int64
ctx context.Context
isNew bool
sink consumer.MetricsConsumer
job string
instance string
jobsMap *JobsMap
useStartTimeMetric bool
receiverName string
ms MetadataService
node *commonpb.Node
metricBuilder *metricBuilder
logger *zap.Logger
id int64
ctx context.Context
isNew bool
sink consumer.MetricsConsumer
job string
instance string
jobsMap *JobsMap
useStartTimeMetric bool
startTimeMetricRegex string
receiverName string
ms MetadataService
node *commonpb.Node
metricBuilder *metricBuilder
logger *zap.Logger
}

func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, receiverName string, ms MetadataService, sink consumer.MetricsConsumer, logger *zap.Logger) *transaction {
func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverName string, ms MetadataService, sink consumer.MetricsConsumer, logger *zap.Logger) *transaction {
return &transaction{
id: atomic.AddInt64(&idSeq, 1),
ctx: ctx,
isNew: true,
sink: sink,
jobsMap: jobsMap,
useStartTimeMetric: useStartTimeMetric,
receiverName: receiverName,
ms: ms,
logger: logger,
id: atomic.AddInt64(&idSeq, 1),
ctx: ctx,
isNew: true,
sink: sink,
jobsMap: jobsMap,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: startTimeMetricRegex,
receiverName: receiverName,
ms: ms,
logger: logger,
}
}

Expand Down Expand Up @@ -133,7 +135,7 @@ func (tr *transaction) initTransaction(ls labels.Labels) error {
tr.instance = instance
}
tr.node = createNode(job, instance, mc.SharedLabels().Get(model.SchemeLabel))
tr.metricBuilder = newMetricBuilder(mc, tr.useStartTimeMetric, tr.logger)
tr.metricBuilder = newMetricBuilder(mc, tr.useStartTimeMetric, tr.startTimeMetricRegex, tr.logger)
tr.isNew = false
return nil
}
Expand Down
Loading