Skip to content

Commit

Permalink
Added blocking registry, with raw collector and transactional handler.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Oct 24, 2021
1 parent fdd407a commit 3dcf61c
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 24 deletions.
58 changes: 46 additions & 12 deletions prometheus/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"sort"
"sync"
"time"

"github.com/cespare/xxhash/v2"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -80,30 +81,42 @@ func (s *CollectSession) Commit() {
s.c.pendingSession = false
}

// NewMetric ...
func (s *CollectSession) MustAddMetric(fqName, help string, labelNames, labelValues []string, valueType ValueType, value float64, ts *time.Time) {
if err := s.AddMetric(fqName, help, labelNames, labelValues, valueType, value, ts); err != nil {
panic(err)
}
}

// AddMetric ...
// TODO(bwplotka): Add validation.
func (s *CollectSession) NewMetric(fqName, help string, labelNames, labelValues []string, valueType ValueType, value float64) error {
func (s *CollectSession) AddMetric(fqName, help string, labelNames, labelValues []string, valueType ValueType, value float64, ts *time.Time) error {
if s.closed {
return errors.New("new metric: collect session is closed, but was attempted to be used")
}

if !sort.StringsAreSorted(labelNames) {
return errors.New("new metric: label names has to be sorted")
}
// Label names can be unsorted, will be sorting them later. The only implication is cachability if
// consumer provide non-deterministic order of those (unlikely since label values has to be matched.

if len(labelNames) != len(labelValues) {
return errors.New("new metric: label name has different len than values")
}

d, ok := s.c.metricFamilyByName[fqName]
d, ok := s.currentByName[fqName]
if !ok {
d, ok = s.c.metricFamilyByName[fqName]
if ok {
d.Metric = d.Metric[:0]
}
}

if !ok {
// TODO(bwplotka): Validate?
d = &dto.MetricFamily{}
d.Name = proto.String(fqName)
d.Type = valueType.ToDTO()
d.Help = proto.String(help)
} else {
// TODO(bwplotka): Validate if same family.
// TODO(bwplotka): Validate if same family?
d.Type = valueType.ToDTO()
d.Help = proto.String(help)
}
Expand All @@ -120,6 +133,9 @@ func (s *CollectSession) NewMetric(fqName, help string, labelNames, labelValues
}
hSum := h.Sum64()

if _, ok := s.currentMetrics[hSum]; ok {
return fmt.Errorf("found duplicate metric (same labels and values) to add %v", fqName)
}
m, ok := s.c.metrics[hSum]
if !ok {
m = &dto.Metric{
Expand Down Expand Up @@ -166,23 +182,28 @@ func (s *CollectSession) NewMetric(fqName, help string, labelNames, labelValues
return fmt.Errorf("unsupported value type %v", valueType)
}

m.TimestampMs = nil
if ts != nil {
m.TimestampMs = proto.Int64(ts.Unix()*1000 + int64(ts.Nanosecond()/1000000))
}

// Will be sorted later.
d.Metric = append(d.Metric, m)
return nil
}

type BlockingRegistry struct {
Gatherer
*Registry

// rawCollector represents special collectors which requires blocking collect for the whole duration
// of returned dto.MetricFamily usage.
rawCollectors []rawCollector
mu sync.Mutex
}

func NewBlockingRegistry(g Gatherer) *BlockingRegistry {
func NewBlockingRegistry() *BlockingRegistry {
return &BlockingRegistry{
Gatherer: g,
Registry: NewRegistry(),
}
}

Expand All @@ -203,9 +224,9 @@ func (b *BlockingRegistry) MustRegisterRaw(r rawCollector) {
}

func (b *BlockingRegistry) Gather() (_ []*dto.MetricFamily, done func(), err error) {
mfs, err := b.Gatherer.Gather()

b.mu.Lock()
mfs, err := b.Registry.Gather()

// TODO(bwplotka): Returned mfs are sorted, so sort raw ones and inject?
// TODO(bwplotka): Implement concurrency for those?
for _, r := range b.rawCollectors {
Expand All @@ -225,3 +246,16 @@ type TransactionalGatherer interface {
// Gather ...
Gather() (_ []*dto.MetricFamily, done func(), err error)
}

func ToTransactionalGatherer(g Gatherer) TransactionalGatherer {
return &noTransactionGatherer{g: g}
}

type noTransactionGatherer struct {
g Gatherer
}

func (g *noTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
mfs, err := g.g.Gather()
return mfs, func() {}, err
}
12 changes: 1 addition & 11 deletions prometheus/promhttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"sync"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -85,16 +84,7 @@ func Handler() http.Handler {
// instrumentation. Use the InstrumentMetricHandler function to apply the same
// kind of instrumentation as it is used by the Handler function.
func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
return HandlerForTransactional(&noTransactionGatherer{reg: reg}, opts)
}

type noTransactionGatherer struct {
reg prometheus.Gatherer
}

func (g *noTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
mfs, err := g.reg.Gather()
return mfs, func() {}, err
return HandlerForTransactional(prometheus.ToTransactionalGatherer(reg), opts)
}

func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerOpts) http.Handler {
Expand Down
11 changes: 10 additions & 1 deletion prometheus/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,16 @@ func CollectAndCompare(c prometheus.Collector, expected io.Reader, metricNames .
// exposition format. If any metricNames are provided, only metrics with those
// names are compared.
func GatherAndCompare(g prometheus.Gatherer, expected io.Reader, metricNames ...string) error {
got, err := g.Gather()
return TransactionalGatherAndCompare(prometheus.ToTransactionalGatherer(g), expected, metricNames...)
}

// TransactionalGatherAndCompare gathers all metrics from the provided Gatherer and compares
// it to an expected output read from the provided Reader in the Prometheus text
// exposition format. If any metricNames are provided, only metrics with those
// names are compared.
func TransactionalGatherAndCompare(g prometheus.TransactionalGatherer, expected io.Reader, metricNames ...string) error {
got, done, err := g.Gather()
defer done()
if err != nil {
return fmt.Errorf("gathering metrics failed: %s", err)
}
Expand Down

0 comments on commit 3dcf61c

Please sign in to comment.