Skip to content

Commit

Permalink
Polish "Publish values on close in OTLP registry"
Browse files Browse the repository at this point in the history
  • Loading branch information
izeye committed Jun 14, 2023
1 parent d64277d commit 13bca51
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,16 @@ private void closingRollover(Meter meter) {
if (meter instanceof StepCounter) {
((StepCounter) meter)._closingRollover();
}
if (meter instanceof StepFunctionCounter) {
else if (meter instanceof StepFunctionCounter) {
((StepFunctionCounter<?>) meter)._closingRollover();
}
if (meter instanceof StepFunctionTimer) {
else if (meter instanceof StepFunctionTimer) {
((StepFunctionTimer<?>) meter)._closingRollover();
}
if (meter instanceof OtlpStepTimer) {
else if (meter instanceof OtlpStepTimer) {
((OtlpStepTimer) meter)._closingRollover();
}
if (meter instanceof OtlpStepDistributionSummary) {
else if (meter instanceof OtlpStepDistributionSummary) {
((OtlpStepDistributionSummary) meter)._closingRollover();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

/**
* This is an internal class not meant for general use. The only reason to have this class
* so that components in this package can call {@code _closingRollover} on
* is that components in this package can call {@code _closingRollover} on
* {@code StepBucketHistogram} and the method does not need to be public.
*/
class OtlpStepBucketHistogram extends StepBucketHistogram {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

/**
* This is an internal class not meant for general use. The only reason to have this class
* so that {@code OtlpMeterRegistry} can call {@code _closingRollover} on
* is that {@code OtlpMeterRegistry} can call {@code _closingRollover} on
* {@code StepTuple2} and the method does not need to be public.
*/
class OtlpStepTuple2<T1, T2> extends StepTuple2<T1, T2> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.micrometer.core.instrument.binder.BaseUnits;
import io.micrometer.core.instrument.distribution.CountAtBucket;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.util.TimeUtils;
import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
Expand All @@ -35,7 +36,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.micrometer.registry.otlp.AggregationTemporality.DELTA;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -443,7 +443,7 @@ void scheduledRolloverDistributionSummary() {
170, 150);
}

@Issue("#1882")
@Issue("#3773")
@Test
void shortLivedPublish() {
clock.add(-1 * clock.monotonicTime() + 1, NANOSECONDS); // set clock back to 1
Expand Down Expand Up @@ -489,7 +489,7 @@ void shortLivedPublish() {
assertThat(registry.publishedTimerMaxMilliseconds.pop()).isEqualTo(5.0);
assertThat(registry.publishedTimerHistogramSnapshots).hasSize(1);
assertHistogramContains(registry.publishedTimerHistogramSnapshots.pop(), MILLISECONDS, 5.0, 5.0,
new CountAtBucket(5.0, 1.0));
new CountAtBucket(TimeUtils.millisToUnit(5.0, NANOSECONDS), 1.0));
assertThat(registry.publishedSummaryCounts).hasSize(1);
assertThat(registry.publishedSummaryCounts.pop()).isOne();
assertThat(registry.publishedSummaryTotals).hasSize(1);
Expand All @@ -507,7 +507,7 @@ void shortLivedPublish() {
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(53);
}

@Issue("#1882")
@Issue("#3773")
@Test
void finalPushHasPartialStep() {
clock.add(-1 * clock.monotonicTime() + 1, NANOSECONDS); // set clock back to 1
Expand Down Expand Up @@ -562,7 +562,7 @@ void finalPushHasPartialStep() {
assertThat(registry.publishedTimerMaxMilliseconds.pop()).isEqualTo(5.0);
assertThat(registry.publishedTimerHistogramSnapshots).hasSize(1);
assertHistogramContains(registry.publishedTimerHistogramSnapshots.pop(), MILLISECONDS, 5.0, 5.0,
new CountAtBucket(5.0, 1.0));
new CountAtBucket(TimeUtils.millisToUnit(5.0, NANOSECONDS), 1.0));
assertThat(registry.publishedSummaryCounts).hasSize(1);
assertThat(registry.publishedSummaryCounts.pop()).isOne();
assertThat(registry.publishedSummaryTotals).hasSize(1);
Expand Down Expand Up @@ -610,7 +610,7 @@ void finalPushHasPartialStep() {
assertThat(registry.publishedTimerSumMilliseconds.pop()).isEqualTo(4.0);
assertThat(registry.publishedTimerMaxMilliseconds.pop()).isEqualTo(4.0);
assertHistogramContains(registry.publishedTimerHistogramSnapshots.pop(), MILLISECONDS, 4.0, 4.0,
new CountAtBucket(4.0, 1.0));
new CountAtBucket(TimeUtils.millisToUnit(4.0, NANOSECONDS), 1.0));
assertThat(registry.publishedSummaryCounts.pop()).isOne();
assertThat(registry.publishedSummaryTotals.pop()).isEqualTo(6);
assertThat(registry.publishedSummaryMaxes.pop()).isEqualTo(6);
Expand All @@ -629,32 +629,38 @@ private void assertEmptyHistogramSnapshot(HistogramSnapshot snapshot) {
}

private void assertHistogramContains(HistogramSnapshot snapshot, TimeUnit unit, double total, double max,
CountAtBucket... countAtBuckets) {
assertThat(snapshot.count()).isEqualTo(countAtBuckets.length);
CountAtBucket... expectedCountAtBuckets) {
assertThat(snapshot.count()).isEqualTo(expectedCountAtBuckets.length);
assertThat(snapshot.total(unit)).isEqualTo(total);
assertThat(snapshot.max(unit)).isEqualTo(max);
for (int i = 0; i < snapshot.histogramCounts().length; i++) {
CountAtBucket countAtBucket = snapshot.histogramCounts()[i];
Arrays.stream(countAtBuckets)
.filter(cb -> countAtBucket.bucket(unit) == cb.bucket())
CountAtBucket[] countAtBuckets = snapshot.histogramCounts();
for (int i = 0; i < countAtBuckets.length; i++) {
CountAtBucket countAtBucket = countAtBuckets[i];
double bucket = countAtBucket.bucket(unit);
double count = countAtBucket.count();
Arrays.stream(expectedCountAtBuckets)
.filter(expectedCountAtBucket -> bucket == expectedCountAtBucket.bucket(unit))
.findFirst()
.ifPresentOrElse(cb -> assertThat(countAtBucket.count()).isEqualTo(cb.count()),
() -> assertThat(countAtBucket.count()).isZero());
.ifPresentOrElse(expectedCountAtBucket -> assertThat(count).isEqualTo(expectedCountAtBucket.count()),
() -> assertThat(count).isZero());
}
}

private void assertHistogramContains(HistogramSnapshot snapshot, double total, double max,
CountAtBucket... countAtBuckets) {
assertThat(snapshot.count()).isEqualTo(countAtBuckets.length);
CountAtBucket... expectedCountAtBuckets) {
assertThat(snapshot.count()).isEqualTo(expectedCountAtBuckets.length);
assertThat(snapshot.total()).isEqualTo(total);
assertThat(snapshot.max()).isEqualTo(max);
for (int i = 0; i < snapshot.histogramCounts().length; i++) {
CountAtBucket countAtBucket = snapshot.histogramCounts()[i];
Arrays.stream(countAtBuckets)
.filter(cb -> countAtBucket.bucket() == cb.bucket())
CountAtBucket[] countAtBuckets = snapshot.histogramCounts();
for (int i = 0; i < countAtBuckets.length; i++) {
CountAtBucket countAtBucket = countAtBuckets[i];
double bucket = countAtBucket.bucket();
double count = countAtBucket.count();
Arrays.stream(expectedCountAtBuckets)
.filter(expectedCountAtBucket -> bucket == expectedCountAtBucket.bucket())
.findFirst()
.ifPresentOrElse(cb -> assertThat(countAtBucket.count()).isEqualTo(cb.count()),
() -> assertThat(countAtBucket.count()).isZero());
.ifPresentOrElse(expectedCountAtBucket -> assertThat(count).isEqualTo(expectedCountAtBucket.count()),
() -> assertThat(count).isZero());
}
}

Expand Down Expand Up @@ -686,21 +692,19 @@ private class TestOtlpMeterRegistry extends OtlpMeterRegistry {

private long lastScheduledPublishStartTime = 0L;

public TestOtlpMeterRegistry() {
super(OtlpDeltaMeterRegistryTest.this.otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock);
TestOtlpMeterRegistry() {
super(otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock);
}

@Override
protected void publish() {
getMeters().stream()
.map(meter -> meter.match(null, this::publishCounter, this::publishTimer, this::publishSummary, null,
null, this::publishFunctionCounter, this::publishFunctionTimer, null))
.collect(Collectors.toList());
forEachMeter(meter -> meter.match(null, this::publishCounter, this::publishTimer, this::publishSummary,
null, null, this::publishFunctionCounter, this::publishFunctionTimer, null));
}

private void scheduledPublish() {
this.lastScheduledPublishStartTime = clock.wallTime();
this.publish();
publish();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ private void rollCount(long now) {
* <p>
* Rolls the values regardless of the clock or current time and ensures the value will
* never roll over again after.
* @since 1.11.0
*/
protected void _closingRollover() {
// ensure rollover does not happen again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public V poll() {
* <p>
* Rolls the values regardless of the clock or current time and ensures the value will
* never roll over again after.
* @since 1.11.0
*/
protected void _closingRollover() {
// make sure value does not roll over again if passing a step boundary
Expand Down

0 comments on commit 13bca51

Please sign in to comment.