Skip to content

Commit

Permalink
[Bugfix]: Fix histogram merge (#324)
Browse files Browse the repository at this point in the history
* Switch to computing sum for histograms and track that for deltas vs trying to merge two means

Signed-off-by: William Dumont <william.dumont@grafana.com>

* Fix missing sum usage and add a test to cover merge

Signed-off-by: William Dumont <william.dumont@grafana.com>

* refactor mergeHistograms as a method of HistogramMetric

Signed-off-by: William Dumont <william.dumont@grafana.com>

---------

Signed-off-by: William Dumont <william.dumont@grafana.com>
Co-authored-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>
  • Loading branch information
wildum and kgeckhart committed May 14, 2024
1 parent 205417f commit f002db5
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 33 deletions.
27 changes: 19 additions & 8 deletions collectors/monitoring_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type ConstMetric struct {
type HistogramMetric struct {
FqName string
LabelKeys []string
Mean float64
Sum float64
Count uint64
Buckets map[float64]uint64
LabelValues []string
Expand All @@ -100,15 +100,26 @@ type HistogramMetric struct {
KeysHash uint64
}

func (h *HistogramMetric) MergeHistogram(other *HistogramMetric) {
// Increment totals based on incoming totals
h.Sum += other.Sum
h.Count += other.Count

// Merge the buckets from existing in to current
for key, value := range other.Buckets {
h.Buckets[key] += value
}
}

func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string, metricKind string) {
fqName := buildFQName(timeSeries)

histogramSum := dist.Mean * float64(dist.Count)
var v HistogramMetric
if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) {
v = HistogramMetric{
FqName: fqName,
LabelKeys: labelKeys,
Mean: dist.Mean,
Sum: histogramSum,
Count: uint64(dist.Count),
Buckets: buckets,
LabelValues: labelValues,
Expand All @@ -133,16 +144,16 @@ func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time
return
}

t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, dist.Mean, uint64(dist.Count), buckets, labelValues)
t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, histogramSum, uint64(dist.Count), buckets, labelValues)
}

func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, mean float64, count uint64, buckets map[float64]uint64, labelValues []string) prometheus.Metric {
func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, sum float64, count uint64, buckets map[float64]uint64, labelValues []string) prometheus.Metric {
return prometheus.NewMetricWithTimestamp(
reportTime,
prometheus.MustNewConstHistogram(
t.newMetricDesc(fqName, labelKeys),
count,
mean*float64(count), // Stackdriver does not provide the sum, but we can fake it
sum,
buckets,
labelValues...,
),
Expand Down Expand Up @@ -249,7 +260,7 @@ func (t *timeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*Hi
}
}
for _, v := range vs {
t.ch <- t.newConstHistogram(v.FqName, v.ReportTime, v.LabelKeys, v.Mean, v.Count, v.Buckets, v.LabelValues)
t.ch <- t.newConstHistogram(v.FqName, v.ReportTime, v.LabelKeys, v.Sum, v.Count, v.Buckets, v.LabelValues)
}
}
}
Expand Down Expand Up @@ -314,7 +325,7 @@ func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime tim
collected.FqName,
collected.ReportTime,
collected.LabelKeys,
collected.Mean,
collected.Sum,
collected.Count,
collected.Buckets,
collected.LabelValues,
Expand Down
27 changes: 4 additions & 23 deletions delta/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ func (s *InMemoryHistogramStore) Increment(metricDescriptor *monitoring.MetricDe

if existing.ReportTime.Before(currentValue.ReportTime) {
level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime)
entry.Collected[key] = mergeHistograms(existing, currentValue)
currentValue.MergeHistogram(existing)
// Replace the existing histogram by the new one after merging it.
entry.Collected[key] = currentValue
return
}

Expand All @@ -101,27 +103,6 @@ func toHistogramKey(hist *collectors.HistogramMetric) uint64 {
return h
}

func mergeHistograms(existing *collectors.HistogramMetric, current *collectors.HistogramMetric) *collectors.HistogramMetric {
for key, value := range existing.Buckets {
current.Buckets[key] += value
}

// Calculate a new mean and overall count
mean := existing.Mean
mean += current.Mean
mean /= 2

var count uint64
for _, v := range current.Buckets {
count += v
}

current.Mean = mean
current.Count = count

return current
}

func (s *InMemoryHistogramStore) ListMetrics(metricDescriptorName string) []*collectors.HistogramMetric {
var output []*collectors.HistogramMetric
now := time.Now()
Expand All @@ -136,7 +117,7 @@ func (s *InMemoryHistogramStore) ListMetrics(metricDescriptorName string) []*col
entry.mutex.Lock()
defer entry.mutex.Unlock()
for key, collected := range entry.Collected {
//Scan and remove metrics which are outside the TTL
// Scan and remove metrics which are outside the TTL
if ttlWindowStart.After(collected.CollectionTime) {
level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.FqName)
delete(entry.Collected, key)
Expand Down
34 changes: 32 additions & 2 deletions delta/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ var _ = Describe("HistogramStore", func() {
var store *delta.InMemoryHistogramStore
var histogram *collectors.HistogramMetric
descriptor := &monitoring.MetricDescriptor{Name: "This is a metric"}
bucketKey := 1.00000000000000000001
bucketValue := uint64(1000)

BeforeEach(func() {
store = delta.NewInMemoryHistogramStore(promlog.New(&promlog.Config{}), time.Minute)
histogram = &collectors.HistogramMetric{
FqName: "histogram_name",
LabelKeys: []string{"labelKey"},
Mean: 10,
Sum: 10,
Count: 100,
Buckets: map[float64]uint64{1.00000000000000000001: 1000},
Buckets: map[float64]uint64{bucketKey: bucketValue},
LabelValues: []string{"labelValue"},
ReportTime: time.Now().Truncate(time.Second),
CollectionTime: time.Now().Truncate(time.Second),
Expand All @@ -53,6 +55,34 @@ var _ = Describe("HistogramStore", func() {
Expect(metrics[0]).To(Equal(histogram))
})

It("can merge histograms", func() {
store.Increment(descriptor, histogram)

// Shallow copy and change report time so they will merge
nextValue := &collectors.HistogramMetric{
FqName: "histogram_name",
LabelKeys: []string{"labelKey"},
Sum: 10,
Count: 100,
Buckets: map[float64]uint64{bucketKey: bucketValue},
LabelValues: []string{"labelValue"},
ReportTime: time.Now().Truncate(time.Second).Add(time.Second),
CollectionTime: time.Now().Truncate(time.Second),
KeysHash: 8765,
}

store.Increment(descriptor, nextValue)

metrics := store.ListMetrics(descriptor.Name)

Expect(len(metrics)).To(Equal(1))
histogram := metrics[0]
Expect(histogram.Count).To(Equal(uint64(200)))
Expect(histogram.Sum).To(Equal(20.0))
Expect(len(histogram.Buckets)).To(Equal(1))
Expect(histogram.Buckets[bucketKey]).To(Equal(bucketValue * 2))
})

It("will remove histograms outside of TTL", func() {
histogram.CollectionTime = histogram.CollectionTime.Add(-time.Hour)

Expand Down

0 comments on commit f002db5

Please sign in to comment.