Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix]: Fix histogram merge #324

Merged
merged 3 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions collectors/monitoring_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type ConstMetric struct {
type HistogramMetric struct {
FqName string
LabelKeys []string
Mean float64
Sum float64
Count uint64
Buckets map[float64]uint64
LabelValues []string
Expand All @@ -100,15 +100,26 @@ type HistogramMetric struct {
KeysHash uint64
}

func (h *HistogramMetric) MergeHistogram(other *HistogramMetric) {
// Increment totals based on incoming totals
h.Sum += other.Sum
h.Count += other.Count

// Merge the buckets from existing in to current
for key, value := range other.Buckets {
h.Buckets[key] += value
}
}

func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string, metricKind string) {
fqName := buildFQName(timeSeries)

histogramSum := dist.Mean * float64(dist.Count)
var v HistogramMetric
if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) {
v = HistogramMetric{
FqName: fqName,
LabelKeys: labelKeys,
Mean: dist.Mean,
Sum: histogramSum,
Count: uint64(dist.Count),
Buckets: buckets,
LabelValues: labelValues,
Expand All @@ -133,16 +144,16 @@ func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time
return
}

t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, dist.Mean, uint64(dist.Count), buckets, labelValues)
t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, histogramSum, uint64(dist.Count), buckets, labelValues)
}

func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, mean float64, count uint64, buckets map[float64]uint64, labelValues []string) prometheus.Metric {
func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, sum float64, count uint64, buckets map[float64]uint64, labelValues []string) prometheus.Metric {
return prometheus.NewMetricWithTimestamp(
reportTime,
prometheus.MustNewConstHistogram(
t.newMetricDesc(fqName, labelKeys),
count,
mean*float64(count), // Stackdriver does not provide the sum, but we can fake it
sum,
buckets,
labelValues...,
),
Expand Down Expand Up @@ -249,7 +260,7 @@ func (t *timeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*Hi
}
}
for _, v := range vs {
t.ch <- t.newConstHistogram(v.FqName, v.ReportTime, v.LabelKeys, v.Mean, v.Count, v.Buckets, v.LabelValues)
t.ch <- t.newConstHistogram(v.FqName, v.ReportTime, v.LabelKeys, v.Sum, v.Count, v.Buckets, v.LabelValues)
}
}
}
Expand Down Expand Up @@ -314,7 +325,7 @@ func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime tim
collected.FqName,
collected.ReportTime,
collected.LabelKeys,
collected.Mean,
collected.Sum,
collected.Count,
collected.Buckets,
collected.LabelValues,
Expand Down
27 changes: 4 additions & 23 deletions delta/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ func (s *InMemoryHistogramStore) Increment(metricDescriptor *monitoring.MetricDe

if existing.ReportTime.Before(currentValue.ReportTime) {
level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime)
entry.Collected[key] = mergeHistograms(existing, currentValue)
currentValue.MergeHistogram(existing)
// Replace the existing histogram by the new one after merging it.
entry.Collected[key] = currentValue
return
}

Expand All @@ -101,27 +103,6 @@ func toHistogramKey(hist *collectors.HistogramMetric) uint64 {
return h
}

func mergeHistograms(existing *collectors.HistogramMetric, current *collectors.HistogramMetric) *collectors.HistogramMetric {
for key, value := range existing.Buckets {
current.Buckets[key] += value
}

// Calculate a new mean and overall count
mean := existing.Mean
mean += current.Mean
mean /= 2

var count uint64
for _, v := range current.Buckets {
count += v
}

current.Mean = mean
current.Count = count

return current
}

func (s *InMemoryHistogramStore) ListMetrics(metricDescriptorName string) []*collectors.HistogramMetric {
var output []*collectors.HistogramMetric
now := time.Now()
Expand All @@ -136,7 +117,7 @@ func (s *InMemoryHistogramStore) ListMetrics(metricDescriptorName string) []*col
entry.mutex.Lock()
defer entry.mutex.Unlock()
for key, collected := range entry.Collected {
//Scan and remove metrics which are outside the TTL
// Scan and remove metrics which are outside the TTL
if ttlWindowStart.After(collected.CollectionTime) {
level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.FqName)
delete(entry.Collected, key)
Expand Down
34 changes: 32 additions & 2 deletions delta/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ var _ = Describe("HistogramStore", func() {
var store *delta.InMemoryHistogramStore
var histogram *collectors.HistogramMetric
descriptor := &monitoring.MetricDescriptor{Name: "This is a metric"}
bucketKey := 1.00000000000000000001
bucketValue := uint64(1000)

BeforeEach(func() {
store = delta.NewInMemoryHistogramStore(promlog.New(&promlog.Config{}), time.Minute)
histogram = &collectors.HistogramMetric{
FqName: "histogram_name",
LabelKeys: []string{"labelKey"},
Mean: 10,
Sum: 10,
Count: 100,
Buckets: map[float64]uint64{1.00000000000000000001: 1000},
Buckets: map[float64]uint64{bucketKey: bucketValue},
LabelValues: []string{"labelValue"},
ReportTime: time.Now().Truncate(time.Second),
CollectionTime: time.Now().Truncate(time.Second),
Expand All @@ -53,6 +55,34 @@ var _ = Describe("HistogramStore", func() {
Expect(metrics[0]).To(Equal(histogram))
})

It("can merge histograms", func() {
store.Increment(descriptor, histogram)

// Shallow copy and change report time so they will merge
nextValue := &collectors.HistogramMetric{
FqName: "histogram_name",
LabelKeys: []string{"labelKey"},
Sum: 10,
Count: 100,
Buckets: map[float64]uint64{bucketKey: bucketValue},
LabelValues: []string{"labelValue"},
ReportTime: time.Now().Truncate(time.Second).Add(time.Second),
CollectionTime: time.Now().Truncate(time.Second),
KeysHash: 8765,
}

store.Increment(descriptor, nextValue)

metrics := store.ListMetrics(descriptor.Name)

Expect(len(metrics)).To(Equal(1))
histogram := metrics[0]
Expect(histogram.Count).To(Equal(uint64(200)))
Expect(histogram.Sum).To(Equal(20.0))
Expect(len(histogram.Buckets)).To(Equal(1))
Expect(histogram.Buckets[bucketKey]).To(Equal(bucketValue * 2))
})

It("will remove histograms outside of TTL", func() {
histogram.CollectionTime = histogram.CollectionTime.Add(-time.Hour)

Expand Down