Skip to content

Commit

Permalink
Polish "Improve Otlp Delta Aggregation with support for max and Histo…
Browse files Browse the repository at this point in the history
  • Loading branch information
izeye committed Jun 5, 2023
1 parent ea0fda9 commit f8f9e7a
Show file tree
Hide file tree
Showing 18 changed files with 107 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.instrument.distribution.*;
import io.micrometer.core.instrument.distribution.Histogram;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.internal.DefaultGauge;
import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
Expand All @@ -39,7 +40,6 @@
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Histogram;
import io.opentelemetry.proto.metrics.v1.*;
import io.opentelemetry.proto.resource.v1.Resource;

Expand Down Expand Up @@ -372,14 +372,14 @@ Metric writeHistogramSupport(HistogramSupport histogramSupport) {
.addExplicitBounds(isTimeBased ? countAtBucket.bucket(getBaseTimeUnit()) : countAtBucket.bucket());
histogramDataPoint.addBucketCounts((long) countAtBucket.count());
}
metricBuilder.setHistogram(Histogram.newBuilder()
metricBuilder.setHistogram(io.opentelemetry.proto.metrics.v1.Histogram.newBuilder()
.setAggregationTemporality(otlpAggregationTemporality)
.addDataPoints(histogramDataPoint));
return metricBuilder.build();
}

return metricBuilder
.setHistogram(Histogram.newBuilder()
.setHistogram(io.opentelemetry.proto.metrics.v1.Histogram.newBuilder()
.setAggregationTemporality(otlpAggregationTemporality)
.addDataPoints(histogramDataPoint))
.build();
Expand All @@ -388,7 +388,7 @@ Metric writeHistogramSupport(HistogramSupport histogramSupport) {
// VisibleForTesting
Metric writeFunctionTimer(FunctionTimer functionTimer) {
return getMetricBuilder(functionTimer.getId())
.setHistogram(Histogram.newBuilder()
.setHistogram(io.opentelemetry.proto.metrics.v1.Histogram.newBuilder()
.addDataPoints(HistogramDataPoint.newBuilder()
.addAllAttributes(getTagsForId(functionTimer.getId()))
.setStartTimeUnixNano(getStartTimeNanos((functionTimer)))
Expand Down Expand Up @@ -466,14 +466,13 @@ Iterable<KeyValue> getResourceAttributes() {
return attributes;
}

static io.micrometer.core.instrument.distribution.Histogram getHistogram(Clock clock,
DistributionStatisticConfig distributionStatisticConfig, AggregationTemporality aggregationTemporality) {
static Histogram getHistogram(Clock clock, DistributionStatisticConfig distributionStatisticConfig,
AggregationTemporality aggregationTemporality) {
return getHistogram(clock, distributionStatisticConfig, aggregationTemporality, 0);
}

static io.micrometer.core.instrument.distribution.Histogram getHistogram(Clock clock,
DistributionStatisticConfig distributionStatisticConfig, AggregationTemporality aggregationTemporality,
long stepMillis) {
static Histogram getHistogram(Clock clock, DistributionStatisticConfig distributionStatisticConfig,
AggregationTemporality aggregationTemporality, long stepMillis) {
// While publishing to OTLP, we export either Histogram datapoint / Summary
// datapoint. So, we will make the histogram either of them and not both.
// Though AbstractTimer/Distribution Summary prefers publishing percentiles,
Expand All @@ -488,7 +487,7 @@ static io.micrometer.core.instrument.distribution.Histogram getHistogram(Clock c
.build()
.merge(distributionStatisticConfig), true, false);
}
else if (AggregationTemporality.isDelta(aggregationTemporality) && stepMillis > 0) {
if (AggregationTemporality.isDelta(aggregationTemporality) && stepMillis > 0) {
return new OtlpStepBucketHistogram(clock, stepMillis, distributionStatisticConfig, true, false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class OtlpStepDistributionSummary extends AbstractDistributionSummary {

@Override
protected void recordNonNegative(double amount) {
count.add(1);
count.add(1L);
total.add(amount);
max.record(amount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class OtlpStepTimer extends AbstractTimer {
@Override
protected void recordNonNegative(final long amount, final TimeUnit unit) {
final long nanoAmount = (long) TimeUtils.convert(amount, unit, TimeUnit.NANOSECONDS);
count.add(1);
count.add(1L);
total.add(nanoAmount);
max.record(nanoAmount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@
import java.util.function.Supplier;

/**
* A {@link StepValue} implementation that tracks max over a Step Interval.
* A {@link StepValue} implementation that tracks max over a step interval.
*
* @author Lenin Jaganathan
* @since 1.11.0
*/
class StepMax extends StepValue<Double> {

private final DoubleAccumulator current = new DoubleAccumulator(Double::max, 0);
private final DoubleAccumulator current = new DoubleAccumulator(Double::max, 0d);

public StepMax(Clock clock, long stepMillis) {
StepMax(Clock clock, long stepMillis) {
super(clock, stepMillis);
}

Expand All @@ -45,7 +44,7 @@ protected Double noValue() {
return 0.0;
}

public void record(double value) {
void record(double value) {
current.accumulate(value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.micrometer.registry.otlp;

import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.binder.BaseUnits;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -201,7 +202,7 @@ void functionTimer() {
@Test
void distributionSummary() {
DistributionSummary size = DistributionSummary.builder("http.response.size")
.baseUnit("bytes")
.baseUnit(BaseUnits.BYTES)
.register(registry);
size.record(100);
size.record(15);
Expand All @@ -218,7 +219,7 @@ void distributionSummary() {
@Test
void distributionSummaryWithHistogramBuckets() {
DistributionSummary size = DistributionSummary.builder("http.request.size")
.baseUnit("bytes")
.baseUnit(BaseUnits.BYTES)
.publishPercentileHistogram()
.register(registry);
size.record(100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import java.time.Duration;

public class OtlpDeltaMeterRegistryCompatibilityTest extends MeterRegistryCompatibilityKit {
class OtlpDeltaMeterRegistryCompatibilityTest extends MeterRegistryCompatibilityKit {

@Override
public MeterRegistry registry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public String get(String key) {
return null;
}
};
};
}

@Test
void gauge() {
Expand Down Expand Up @@ -321,7 +321,7 @@ void testMetricsStartAndEndTime() {
Function<Meter, NumberDataPoint> getDataPoint = (meter) -> writeToMetric(meter).getSum().getDataPoints(0);
assertThat(getDataPoint.apply(counter).getStartTimeUnixNano()).isEqualTo(0);
assertThat(getDataPoint.apply(counter).getTimeUnixNano()).isEqualTo(60000000000L);
clock.addSeconds(59);
clock.addSeconds(otlpConfig().step().getSeconds() - 1);
assertThat(getDataPoint.apply(counter).getStartTimeUnixNano()).isEqualTo(0);
assertThat(getDataPoint.apply(counter).getTimeUnixNano()).isEqualTo(60000000000L);
clock.addSeconds(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,34 @@ protected AbstractDistributionSummary(Id id, Clock clock, DistributionStatisticC
this(id, scale, defaultHistogram(clock, distributionStatisticConfig, supportsAggregablePercentiles));
}

/**
* Creates an {@code AbstractDistributionSummary} instance.
* @param id meter ID
* @param scale scale
* @param histogram histogram
* @since 1.11.0
*/
protected AbstractDistributionSummary(Id id, double scale, @Nullable Histogram histogram) {
super(id);
this.scale = scale;
this.histogram = histogram == null ? NoopHistogram.INSTANCE : histogram;
}

/**
* Creates a default histogram.
* @param clock clock
* @param distributionStatisticConfig distribution statistic configuration
* @param supportsAggregablePercentiles whether to support aggregable percentiles
* @return a default histogram
* @since 1.11.0
*/
protected static Histogram defaultHistogram(Clock clock, DistributionStatisticConfig distributionStatisticConfig,
boolean supportsAggregablePercentiles) {
if (distributionStatisticConfig.isPublishingPercentiles()) {
// hdr-based histogram
return new TimeWindowPercentileHistogram(clock, distributionStatisticConfig, supportsAggregablePercentiles);
}
else if (distributionStatisticConfig.isPublishingHistogram()) {
if (distributionStatisticConfig.isPublishingHistogram()) {
// fixed boundary histograms, which have a slightly better memory footprint
// when we don't need Micrometer-computed percentiles
return new TimeWindowFixedBoundaryHistogram(clock, distributionStatisticConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ protected AbstractTimer(Id id, Clock clock, DistributionStatisticConfig distribu
defaultHistogram(clock, distributionStatisticConfig, supportsAggregablePercentiles));
}

/**
* Creates a new timer.
* @param id The timer's name and tags.
* @param clock The clock used to measure latency.
* @param pauseDetector Compensation for coordinated omission.
* @param baseTimeUnit The time scale of this timer.
* @param histogram Histogram.
* @since 1.11.0
*/
protected AbstractTimer(Id id, Clock clock, PauseDetector pauseDetector, TimeUnit baseTimeUnit,
Histogram histogram) {
super(id);
Expand All @@ -95,13 +104,23 @@ protected AbstractTimer(Id id, Clock clock, PauseDetector pauseDetector, TimeUni
this.histogram = histogram;
}

/**
* Creates a default histogram.
* @param clock The clock used to measure latency.
* @param distributionStatisticConfig Configuration determining which distribution
* statistics are sent.
* @param supportsAggregablePercentiles Indicates whether the registry supports
* percentile approximations from histograms.
* @return a default histogram
* @since 1.11.0
*/
protected static Histogram defaultHistogram(Clock clock, DistributionStatisticConfig distributionStatisticConfig,
boolean supportsAggregablePercentiles) {
if (distributionStatisticConfig.isPublishingPercentiles()) {
// hdr-based histogram
return new TimeWindowPercentileHistogram(clock, distributionStatisticConfig, supportsAggregablePercentiles);
}
else if (distributionStatisticConfig.isPublishingHistogram()) {
if (distributionStatisticConfig.isPublishingHistogram()) {
// fixed boundary histograms, which have a slightly better memory footprint
// when we don't need Micrometer-computed percentiles
return new TimeWindowFixedBoundaryHistogram(clock, distributionStatisticConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ public CumulativeDistributionSummary(Id id, Clock clock, DistributionStatisticCo
distributionStatisticConfig, supportsAggregablePercentiles));
}

/**
* Creates a {@code CumulativeDistributionSummary} instance.
* @param id meter ID
* @param clock clock
* @param distributionStatisticConfig distribution statistic configuration
* @param scale scale
* @param histogram histogram
* @since 1.11.0
*/
protected CumulativeDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, Histogram histogram) {
super(id, scale, histogram);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ public CumulativeTimer(Id id, Clock clock, DistributionStatisticConfig distribut
AbstractTimer.defaultHistogram(clock, distributionStatisticConfig, supportsAggregablePercentiles));
}

/**
* Creates a {@code CumulativeTimer} instance.
* @param id meter ID
* @param clock clock
* @param distributionStatisticConfig distribution statistic configuration
* @param pauseDetector pause detector
* @param baseTimeUnit base time unit
* @param histogram histogram
* @since 1.11.0
*/
protected CumulativeTimer(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector, TimeUnit baseTimeUnit, Histogram histogram) {
super(id, clock, pauseDetector, baseTimeUnit, histogram);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ private void validate(DistributionStatisticConfig distributionStatisticConfig) {
}

if (config.maximumExpectedValue != null && config.maximumExpectedValue <= 0) {
rejectConfig("maximumExpectedValue (" + config.minimumExpectedValue + ") must be greater than 0.");
rejectConfig("maximumExpectedValue (" + config.maximumExpectedValue + ") must be greater than 0.");
}

if ((config.minimumExpectedValue != null && config.maximumExpectedValue != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,16 @@ void record(long value) {
/**
* The least bucket that is less than or equal to a sample.
*/
int leastLessThanOrEqualTo(long key) {
int leastLessThanOrEqualTo(double key) {
int low = 0;
int high = buckets.length - 1;

while (low <= high) {
int mid = (low + high) >>> 1;
if (buckets[mid] < key)
double value = buckets[mid];
if (value < key)
low = mid + 1;
else if (buckets[mid] > key)
else if (value > key)
high = mid - 1;
else
return mid; // exact match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import java.util.function.Supplier;

/**
* A Histogram implementation which inherits the behaviour of Step Meters i,e this
* histogram exhibits read and reset behaviour.
* A Histogram implementation which inherits the behaviour of step meters, i.e. read and
* reset behaviour.
*
* @author Lenin Jaganathan
* @since 1.11.0
Expand Down Expand Up @@ -60,7 +60,7 @@ public void recordDouble(double value) {

@Override
public HistogramSnapshot takeSnapshot(long count, double total, double max) {
return new HistogramSnapshot(count, total, max, null, this.poll(), null);
return new HistogramSnapshot(count, total, max, null, poll(), null);
}

@Override
Expand Down Expand Up @@ -95,7 +95,9 @@ private static CountAtBucket[] getEmptyCounts(double[] buckets) {
private static double[] getBucketsFromDistributionStatisticConfig(
DistributionStatisticConfig distributionStatisticConfig, boolean supportsAggregablePercentiles) {
if (distributionStatisticConfig.getMaximumExpectedValueAsDouble() == null
|| distributionStatisticConfig.getMinimumExpectedValueAsDouble() == null) {
|| distributionStatisticConfig.getMinimumExpectedValueAsDouble() == null
|| distributionStatisticConfig.getMaximumExpectedValueAsDouble() <= 0
|| distributionStatisticConfig.getMinimumExpectedValueAsDouble() <= 0) {
throw new InvalidConfigurationException(
"minimumExpectedValue and maximumExpectedValue should be greater than 0.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void resetAccumulatedHistogram() {
}

/**
* For recording efficiency, We turn normal histogram into cumulative count histogram
* For recording efficiency, we turn normal histogram into cumulative count histogram
* only on calls to {@link #countsAtValues(Iterator<Double>)}.
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ public StepValue(final Clock clock, final long stepMillis) {
this(clock, stepMillis, null);
}

/**
* Creates a {@code StepValue} instance.
* @param clock clock
* @param stepMillis step in milliseconds
* @param initValue initial value
* @since 1.11.0
*/
protected StepValue(final Clock clock, final long stepMillis, @Nullable final V initValue) {
this.clock = clock;
this.stepMillis = stepMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ void sloWithAggregablePercentilesTrue_sloBucketPlusPercentilesHistogramBuckets()
.merge(distributionStatisticConfig);
try (StepBucketHistogram histogram = new StepBucketHistogram(clock, step.toMillis(), config,
supportsAggregablePercentiles, true)) {
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).contains(new CountAtBucket(slo, 0));
CountAtBucket sloBucket = new CountAtBucket(slo, 0);
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).hasSizeGreaterThan(1).contains(sloBucket);
clock.add(step);
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).contains(new CountAtBucket(slo, 0));
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).hasSizeGreaterThan(1).contains(sloBucket);
}
}

Expand All @@ -103,9 +104,10 @@ void sloWithPercentileHistogramFalse_onlySloBucket() {
.merge(getConfig(false));
try (StepBucketHistogram histogram = new StepBucketHistogram(clock, step.toMillis(), config,
supportsAggregablePercentiles, true)) {
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).contains(new CountAtBucket(slo, 0));
CountAtBucket sloBucket = new CountAtBucket(slo, 0);
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).containsExactly(sloBucket);
clock.add(step);
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).contains(new CountAtBucket(slo, 0));
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).containsExactly(sloBucket);
}
}

Expand Down
Loading

0 comments on commit f8f9e7a

Please sign in to comment.