Skip to content

Commit

Permalink
Align DeltaHistogram in SignalFx registry with count and total (#3799)
Browse files Browse the repository at this point in the history
Count and total for Step Meters have been made to roll over at the start of the step. SignalFx's DeltaHistogram however did not align to this, resulting in a histogram that does not match with the count/total values as it should. This makes those align and generally updates the implementation to align better with the behavior provided to StepMeterRegistry implementations.

Closes gh-3774
  • Loading branch information
lenin-jaganathan committed Jun 12, 2023
1 parent 693ffa8 commit 03a227a
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,38 @@
*/
final class CumulativeHistogramConfigUtil {

static DistributionStatisticConfig updateConfig(DistributionStatisticConfig distributionStatisticConfig) {
private static final double[] EMPTY_SLO = new double[0];

static DistributionStatisticConfig updateConfig(DistributionStatisticConfig distributionStatisticConfig,
boolean isDelta) {
double[] sloBoundaries = distributionStatisticConfig.getServiceLevelObjectiveBoundaries();
if (sloBoundaries == null || sloBoundaries.length == 0) {
return distributionStatisticConfig;
}
double[] newSloBoundaries = sloBoundaries;
// Add the +Inf bucket since the "count" resets every export.
if (!isPositiveInf(sloBoundaries[sloBoundaries.length - 1])) {
newSloBoundaries = Arrays.copyOf(sloBoundaries, sloBoundaries.length + 1);
newSloBoundaries[newSloBoundaries.length - 1] = Double.MAX_VALUE;
}

return DistributionStatisticConfig.builder()
// Set the expiration duration for the histogram counts to be effectively
// infinite.
// Without this, the counts are reset every expiry duration.
.expiry(Duration.ofNanos(Long.MAX_VALUE)) // effectively infinite
.bufferLength(1)
.serviceLevelObjectives(newSloBoundaries)
// If delta Histograms are enabled, empty the slo's and use
// StepBucketHistogram.
.serviceLevelObjectives(isDelta ? EMPTY_SLO : addPositiveInfBucket(sloBoundaries))
.build()
.merge(distributionStatisticConfig);
}

static double[] addPositiveInfBucket(double[] sloBoundaries) {
double[] newSloBoundaries = sloBoundaries;
// Add the +Inf bucket since the "count" resets every export.
if (!isPositiveInf(sloBoundaries[sloBoundaries.length - 1])) {
newSloBoundaries = Arrays.copyOf(sloBoundaries, sloBoundaries.length + 1);
newSloBoundaries[newSloBoundaries.length - 1] = Double.MAX_VALUE;
}
return newSloBoundaries;
}

private static boolean isPositiveInf(double bucket) {
return bucket == Double.POSITIVE_INFINITY || bucket == Double.MAX_VALUE || (long) bucket == Long.MAX_VALUE;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,88 +16,72 @@
package io.micrometer.signalfx;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.AbstractDistributionSummary;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.micrometer.core.instrument.step.StepTuple2;

import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;
import io.micrometer.core.instrument.distribution.StepBucketHistogram;
import io.micrometer.core.instrument.step.StepDistributionSummary;

/**
* This class is mostly the same as
* {@link io.micrometer.core.instrument.step.StepDistributionSummary}, with one notable
* difference: the {@link DistributionStatisticConfig} is modified before being passed to
* the super class constructor - that forces the histogram generated by this meter to be
* cumulative.
* A StepDistributionSummary which provides support for multiple flavours of Histograms to
* be recorded based on {@link SignalFxConfig#publishCumulativeHistogram()} and
* {@link SignalFxConfig#publishDeltaHistogram()}.
*
* @author Bogdan Drutu
* @author Mateusz Rzeszutek
* @author Lenin Jaganathan
*/
final class SignalfxDistributionSummary extends AbstractDistributionSummary {

private final LongAdder count = new LongAdder();

private final DoubleAdder total = new DoubleAdder();

private final StepTuple2<Long, Double> countTotal;

private final TimeWindowMax max;
final class SignalfxDistributionSummary extends StepDistributionSummary {

@Nullable
private final DeltaHistogramCounts deltaHistogramCounts;
private final StepBucketHistogram stepBucketHistogram;

SignalfxDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, long stepMillis, boolean isDelta) {
super(id, clock, CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig), scale, false);
this.countTotal = new StepTuple2<>(clock, stepMillis, 0L, 0.0, count::sumThenReset, total::sumThenReset);
max = new TimeWindowMax(clock, distributionStatisticConfig);
if (distributionStatisticConfig.isPublishingHistogram() && isDelta) {
deltaHistogramCounts = new DeltaHistogramCounts();
super(id, clock, distributionStatisticConfig, scale, stepMillis, defaultHistogram(clock,
CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig, isDelta), false));

double[] slo = distributionStatisticConfig.getServiceLevelObjectiveBoundaries();
if (slo != null && slo.length > 0 && isDelta) {
stepBucketHistogram = new StepBucketHistogram(clock, stepMillis,
DistributionStatisticConfig.builder()
.serviceLevelObjectives(CumulativeHistogramConfigUtil.addPositiveInfBucket(slo))
.build()
.merge(distributionStatisticConfig),
false, true);
}
else {
deltaHistogramCounts = null;
stepBucketHistogram = null;
}
}

@Override
protected void recordNonNegative(double amount) {
count.increment();
total.add(amount);
max.record(amount);
if (stepBucketHistogram != null) {
stepBucketHistogram.recordDouble(amount);
}
super.recordNonNegative(amount);
}

@Override
public long count() {
return countTotal.poll1();
}

@Override
public double totalAmount() {
return countTotal.poll2();
}

@Override
public double max() {
return max.poll();
if (stepBucketHistogram != null) {
// Force the stepBucketHistogram to be aligned to step by calling count. This
// ensures that all
// values exported by the Timer are measured for the same step.
stepBucketHistogram.poll();
}
return super.count();
}

@Override
public HistogramSnapshot takeSnapshot() {
HistogramSnapshot currentSnapshot = super.takeSnapshot();
if (deltaHistogramCounts == null) {
if (stepBucketHistogram == null) {
return currentSnapshot;
}
return new HistogramSnapshot(currentSnapshot.count(), // Already delta in sfx
// implementation.
currentSnapshot.total(), // Already delta in sfx implementation.
currentSnapshot.max(), // Max cannot be calculated as delta, keep the
// current.
currentSnapshot.percentileValues(), // No changes to the percentile
// values.
deltaHistogramCounts.calculate(currentSnapshot.histogramCounts()), currentSnapshot::outputSummary);
return new HistogramSnapshot(currentSnapshot.count(), currentSnapshot.total(), currentSnapshot.max(),
currentSnapshot.percentileValues(), stepBucketHistogram.poll(), currentSnapshot::outputSummary);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,91 +16,75 @@
package io.micrometer.signalfx;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.AbstractTimer;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.micrometer.core.instrument.distribution.StepBucketHistogram;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.step.StepTuple2;
import io.micrometer.core.instrument.util.TimeUtils;
import io.micrometer.core.instrument.step.StepTimer;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

/**
* This class is mostly the same as {@link io.micrometer.core.instrument.step.StepTimer},
* with one notable difference: the {@link DistributionStatisticConfig} is modified before
* being passed to the super class constructor - that forces the histogram generated by
* this meter to be cumulative.
* A StepTimer which provides support for multiple flavours of Histograms to be recorded
* based on {@link SignalFxConfig#publishCumulativeHistogram()} and
* {@link SignalFxConfig#publishDeltaHistogram()}.
*
* @author Bogdan Drutu
* @author Mateusz Rzeszutek
* @author Lenin Jaganathan
*/
final class SignalfxTimer extends AbstractTimer {

private final LongAdder count = new LongAdder();

private final LongAdder total = new LongAdder();

private final StepTuple2<Long, Long> countTotal;

private final TimeWindowMax max;
final class SignalfxTimer extends StepTimer {

@Nullable
private final DeltaHistogramCounts deltaHistogramCounts;
private final StepBucketHistogram stepBucketHistogram;

SignalfxTimer(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector, TimeUnit baseTimeUnit, long stepMillis, boolean isDelta) {
super(id, clock, CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig), pauseDetector,
baseTimeUnit, false);
countTotal = new StepTuple2<>(clock, stepMillis, 0L, 0L, count::sumThenReset, total::sumThenReset);
max = new TimeWindowMax(clock, distributionStatisticConfig);
if (distributionStatisticConfig.isPublishingHistogram() && isDelta) {
deltaHistogramCounts = new DeltaHistogramCounts();
super(id, clock, distributionStatisticConfig, pauseDetector, baseTimeUnit, stepMillis, defaultHistogram(clock,
CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig, isDelta), false));

double[] slo = distributionStatisticConfig.getServiceLevelObjectiveBoundaries();
if (slo != null && slo.length > 0 && isDelta) {
stepBucketHistogram = new StepBucketHistogram(clock, stepMillis,
DistributionStatisticConfig.builder()
.serviceLevelObjectives(CumulativeHistogramConfigUtil.addPositiveInfBucket(slo))
.build()
.merge(distributionStatisticConfig),
false, true);
}
else {
deltaHistogramCounts = null;
stepBucketHistogram = null;
}
}

@Override
protected void recordNonNegative(long amount, TimeUnit unit) {
final long nanoAmount = (long) TimeUtils.convert(amount, unit, TimeUnit.NANOSECONDS);
count.increment();
total.add(nanoAmount);
max.record(amount, unit);
if (stepBucketHistogram != null) {
stepBucketHistogram.recordLong(TimeUnit.NANOSECONDS.convert(amount, unit));
}
super.recordNonNegative(amount, unit);
}

@Override
public long count() {
return countTotal.poll1();
}

@Override
public double totalTime(TimeUnit unit) {
return TimeUtils.nanosToUnit(countTotal.poll2(), unit);
}

@Override
public double max(TimeUnit unit) {
return max.poll(unit);
if (stepBucketHistogram != null) {
// Force the stepBucketHistogram to be aligned to step by calling count. This
// ensures that all
// values exported by the Timer are measured for the same step.
stepBucketHistogram.poll();
}
return super.count();
}

@Override
public HistogramSnapshot takeSnapshot() {
HistogramSnapshot currentSnapshot = super.takeSnapshot();
if (deltaHistogramCounts == null) {
if (stepBucketHistogram == null) {
return currentSnapshot;
}
return new HistogramSnapshot(currentSnapshot.count(), // Already delta in sfx
// implementation
currentSnapshot.total(), // Already delta in sfx implementation
currentSnapshot.max(), // Max cannot be calculated as delta, keep the
// current.
currentSnapshot.percentileValues(), // No changes to the percentile
// values.
deltaHistogramCounts.calculate(currentSnapshot.histogramCounts()), currentSnapshot::outputSummary);
return new HistogramSnapshot(currentSnapshot.count(), currentSnapshot.total(), currentSnapshot.max(),
currentSnapshot.percentileValues(), stepBucketHistogram.poll(), currentSnapshot::outputSummary);
}

}

This file was deleted.

0 comments on commit 03a227a

Please sign in to comment.