Skip to content

Commit

Permalink
Publish completed step values on close before scheduled publish (Step…
Browse files Browse the repository at this point in the history
…Registry) (#3864)

When a registry was closed before the scheduled publish in the step, the values were lost from the previous step that would have been published during the scheduled publish. This adds logic to detect this scenario and do an extra publish before rolling over values for the partial step to publish with closing.

Resolves gh-3863
  • Loading branch information
lenin-jaganathan committed Jun 12, 2023
1 parent 4811b7e commit b0be362
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,29 @@ protected DistributionStatisticConfig defaultHistogramConfig() {
public void close() {
stop();
if (!isPublishing() && isDelta()) {
if (!isDataPublishedForCurrentStep()) {
// Data was not published for the current step. So, we should flush that
// first.
try {
this.publish();
}
catch (Throwable e) {
logger.warn(
"Unexpected exception thrown while publishing metrics for " + getClass().getSimpleName(),
e);
}
}
getMeters().forEach(this::closingRollover);
}
super.close();
}

private boolean isDataPublishedForCurrentStep() {
long currentTimeInMillis = clock.wallTime();
return (getLastScheduledPublishStartTime() / config.step().toMillis()) >= (currentTimeInMillis
/ config.step().toMillis());
}

// Either we do this or make StepMeter public
// and still call OtlpStepTimer and OtlpStepDistributionSummary separately.
private void closingRollover(Meter meter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ void finalPushHasPartialStep() {
assertThat(functionTimer.totalTime(MILLISECONDS)).isZero();

stepOverNStep(1);
registry.publish();
registry.scheduledPublish();

assertThat(registry.publishedCounterCounts).hasSize(1);
assertThat(registry.publishedCounterCounts.pop()).isOne();
Expand Down Expand Up @@ -684,6 +684,8 @@ private class TestOtlpMeterRegistry extends OtlpMeterRegistry {

Deque<Double> publishedFunctionTimerTotals = new ArrayDeque<>();

private long lastScheduledPublishStartTime = 0L;

public TestOtlpMeterRegistry() {
super(OtlpDeltaMeterRegistryTest.this.otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock);
}
Expand All @@ -696,6 +698,16 @@ protected void publish() {
.collect(Collectors.toList());
}

private void scheduledPublish() {
this.lastScheduledPublishStartTime = clock.wallTime();
this.publish();
}

@Override
protected long getLastScheduledPublishStartTime() {
return lastScheduledPublishStartTime;
}

private Timer publishTimer(Timer timer) {
publishedTimerCounts.add(timer.count());
publishedTimerSumMilliseconds.add(timer.totalTime(MILLISECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public abstract class PushMeterRegistry extends MeterRegistry {

private final AtomicBoolean publishing = new AtomicBoolean(false);

private long lastScheduledPublishStartTime = 0L;

@Nullable
private ScheduledExecutorService scheduledExecutorService;

Expand All @@ -60,6 +62,7 @@ protected PushMeterRegistry(PushRegistryConfig config, Clock clock) {
// VisibleForTesting
void publishSafely() {
if (this.publishing.compareAndSet(false, true)) {
this.lastScheduledPublishStartTime = clock.wallTime();
try {
publish();
}
Expand All @@ -85,6 +88,14 @@ protected boolean isPublishing() {
return publishing.get();
}

/**
* Returns the time when the last scheduled publish was started by
* {@link PushMeterRegistry#publishSafely()}.
*/
protected long getLastScheduledPublishStartTime() {
return lastScheduledPublishStartTime;
}

/**
* @deprecated Use {@link #start(ThreadFactory)} instead.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.micrometer.core.instrument.step;

import io.micrometer.common.lang.Nullable;
import io.micrometer.common.util.internal.logging.InternalLogger;
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.HistogramGauges;
Expand All @@ -40,6 +42,8 @@
*/
public abstract class StepMeterRegistry extends PushMeterRegistry {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(StepMeterRegistry.class);

private final StepRegistryConfig config;

@Nullable
Expand Down Expand Up @@ -133,15 +137,41 @@ public void stop() {
@Override
public void close() {
stop();

if (!isPublishing()) {
getMeters().stream()
.filter(StepMeter.class::isInstance)
.map(StepMeter.class::cast)
.forEach(StepMeter::_closingRollover);
if (!isDataPublishedForCurrentStep()) {
// Data was not published for the current step. So, we should flush that
// first.
try {
this.publish();
}
catch (Throwable e) {
logger.warn(
"Unexpected exception thrown while publishing metrics for " + getClass().getSimpleName(),
e);
}
}
closeStepMeters();
}
super.close();
}

private boolean isDataPublishedForCurrentStep() {
long currentTimeInMillis = clock.wallTime();
return (getLastScheduledPublishStartTime() / config.step().toMillis()) >= (currentTimeInMillis
/ config.step().toMillis());
}

/**
* Performs closing rollover on StepMeters.
*/
private void closeStepMeters() {
getMeters().stream()
.filter(StepMeter.class::isInstance)
.map(StepMeter.class::cast)
.forEach(StepMeter::_closingRollover);
}

/**
* This will poll the values from meters, which will cause a roll over for Step-meters
* if past the step boundary. This gives some control over when roll over happens
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,7 @@ void shortLivedPublish() {
.register(registry);

// before step rollover
assertThat(counter.count()).isZero();
assertThat(timer.count()).isZero();
assertThat(timer.totalTime(MILLISECONDS)).isZero();
assertThat(summary.count()).isZero();
assertThat(summary.totalAmount()).isZero();
assertThat(functionCounter.count()).isZero();
assertThat(functionTimer.count()).isZero();
assertThat(functionTimer.totalTime(MILLISECONDS)).isZero();
assertBeforeRollover(counter, timer, summary, functionCounter, functionTimer);

registry.close();

Expand Down Expand Up @@ -224,17 +217,10 @@ void finalPushHasPartialStep() {
.register(registry);

// before step rollover
assertThat(counter.count()).isZero();
assertThat(timer.count()).isZero();
assertThat(timer.totalTime(MILLISECONDS)).isZero();
assertThat(summary.count()).isZero();
assertThat(summary.totalAmount()).isZero();
assertThat(functionCounter.count()).isZero();
assertThat(functionTimer.count()).isZero();
assertThat(functionTimer.totalTime(MILLISECONDS)).isZero();
assertBeforeRollover(counter, timer, summary, functionCounter, functionTimer);

clock.add(config.step());
registry.publish();
addTimeWithRolloverOnStepStart(clock, registry, config, config.step());
registry.scheduledPublish();

assertThat(registry.publishedCounterCounts).hasSize(1);
assertThat(registry.publishedCounterCounts.pop()).isOne();
Expand All @@ -254,7 +240,7 @@ void finalPushHasPartialStep() {
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(53);

// set clock to middle of second step
clock.add(config.step().dividedBy(2));
addTimeWithRolloverOnStepStart(clock, registry, config, config.step().dividedBy(2));
// record some more values in new step interval
counter.increment(2);
timer.record(6, MILLISECONDS);
Expand Down Expand Up @@ -309,14 +295,7 @@ void publishOnCloseCrossesStepBoundary() {
.register(registry);

// before rollover
assertThat(counter.count()).isZero();
assertThat(timer.count()).isZero();
assertThat(timer.totalTime(MILLISECONDS)).isZero();
assertThat(summary.count()).isZero();
assertThat(summary.totalAmount()).isZero();
assertThat(functionCounter.count()).isZero();
assertThat(functionTimer.count()).isZero();
assertThat(functionTimer.totalTime(MILLISECONDS)).isZero();
assertBeforeRollover(counter, timer, summary, functionCounter, functionTimer);

// before publishing, simulate a step boundary being crossed after forced rollover
// on close and before/during publishing
Expand All @@ -342,6 +321,57 @@ void publishOnCloseCrossesStepBoundary() {
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(53);
}

@Test
@Issue("#3863")
void shouldPublishLastCompletedStepWhenClosingBeforeScheduledPublish() {
Counter counter = Counter.builder("counter_3863").register(registry);
Timer timer = Timer.builder("timer_3863").register(registry);
DistributionSummary summary = DistributionSummary.builder("summary_3863").register(registry);

AtomicLong functionValue = new AtomicLong(0);
FunctionCounter functionCounter = FunctionCounter
.builder("counter.function_3863", functionValue, AtomicLong::get)
.register(registry);
FunctionTimer functionTimer = FunctionTimer
.builder("timer.function_3863", this, obj -> 3, obj -> 53, MILLISECONDS)
.register(registry);

counter.increment();
timer.record(5, MILLISECONDS);
summary.record(5);
functionValue.set(1);

// before rollover
assertBeforeRollover(counter, timer, summary, functionCounter, functionTimer);

addTimeWithRolloverOnStepStart(clock, registry, config, Duration.ofSeconds(60));

// All new recordings now belong to next step.
counter.increment(2);
timer.record(10, MILLISECONDS);
summary.record(10);
functionValue.incrementAndGet();

// Simulating the application close behaviour before actual publishing happens.
registry.close();

assertThat(registry.publishedCounterCounts).hasSize(2);
assertThat(registry.sumAllPublishedValues(registry.publishedCounterCounts)).isEqualTo(3);
assertThat(registry.publishedTimerCounts).hasSize(2);
assertThat(registry.sumAllPublishedValues(registry.publishedTimerCounts)).isEqualTo(2);
assertThat(registry.sumAllPublishedValues(registry.publishedTimerSumMilliseconds)).isEqualTo(15);
assertThat(registry.publishedSummaryCounts).hasSize(2);
assertThat(registry.sumAllPublishedValues(registry.publishedSummaryCounts)).isEqualTo(2);
assertThat(registry.sumAllPublishedValues(registry.publishedSummaryTotals)).isEqualTo(15);

assertThat(registry.publishedFunctionCounterCounts).hasSize(2);
assertThat(registry.sumAllPublishedValues(registry.publishedFunctionCounterCounts)).isEqualTo(2);

assertThat(registry.publishedFunctionTimerCounts).hasSize(2);
assertThat(registry.sumAllPublishedValues(registry.publishedFunctionTimerCounts)).isEqualTo(3);
assertThat(registry.sumAllPublishedValues(registry.publishedFunctionTimerTotals)).isEqualTo(53);
}

@Test
@Issue("#2818")
void scheduledRollOver() {
Expand All @@ -357,28 +387,17 @@ void scheduledRollOver() {
.register(registry);

// before rollover
assertThat(counter.count()).isZero();
assertThat(timer.count()).isZero();
assertThat(timer.totalTime(MILLISECONDS)).isZero();
assertThat(summary.count()).isZero();
assertThat(summary.totalAmount()).isZero();
assertThat(functionCounter.count()).isZero();
assertThat(functionTimer.count()).isZero();
assertThat(functionTimer.totalTime(MILLISECONDS)).isZero();

clock.addSeconds(60);
// simulate this being scheduled at the start of the step
registry.pollMetersToRollover();
assertBeforeRollover(counter, timer, summary, functionCounter, functionTimer);

clock.addSeconds(1);
addTimeWithRolloverOnStepStart(clock, registry, config, Duration.ofSeconds(60));
// these recordings belong to the current step and should not be published
counter.increment();
timer.record(5, MILLISECONDS);
summary.record(8);
clock.addSeconds(10);
addTimeWithRolloverOnStepStart(clock, registry, config, Duration.ofSeconds(10));

// recordings that happened in the previous step should be published
registry.publish();
registry.scheduledPublish();
assertThat(registry.publishedCounterCounts).hasSize(1);
assertThat(registry.publishedCounterCounts.pop()).isOne();
assertThat(registry.publishedTimerCounts).hasSize(1);
Expand Down Expand Up @@ -415,6 +434,8 @@ private class MyStepMeterRegistry extends StepMeterRegistry {

Deque<Double> publishedFunctionTimerTotals = new ArrayDeque<>();

private long lastScheduledPublishStartTime = 0L;

@Nullable
Runnable prePublishAction;

Expand All @@ -438,6 +459,16 @@ protected void publish() {
.collect(Collectors.toList());
}

private void scheduledPublish() {
this.lastScheduledPublishStartTime = clock.wallTime();
this.publish();
}

@Override
protected long getLastScheduledPublishStartTime() {
return lastScheduledPublishStartTime;
}

private Timer publishTimer(Timer timer) {
publishedTimerCounts.add(timer.count());
publishedTimerSumMilliseconds.add(timer.totalTime(MILLISECONDS));
Expand Down Expand Up @@ -466,11 +497,52 @@ private DistributionSummary publishSummary(DistributionSummary summary) {
return summary;
}

<T extends Number> double sumAllPublishedValues(Deque<T> deque) {
double sum = 0;
while (!deque.isEmpty()) {
sum += deque.pop().doubleValue();
}
return sum;
}

@Override
protected TimeUnit getBaseTimeUnit() {
return TimeUnit.SECONDS;
}

}

private static void assertBeforeRollover(final Counter counter, final Timer timer,
final DistributionSummary summary, final FunctionCounter functionCounter,
final FunctionTimer functionTimer) {
assertThat(counter.count()).isZero();
assertThat(timer.count()).isZero();
assertThat(timer.totalTime(MILLISECONDS)).isZero();
assertThat(summary.count()).isZero();
assertThat(summary.totalAmount()).isZero();
assertThat(functionCounter.count()).isZero();
assertThat(functionTimer.count()).isZero();
assertThat(functionTimer.totalTime(MILLISECONDS)).isZero();
}

/**
* This method simulates the behaviour StepRegistry will exhibit when rollOver is
* scheduled on a thread. This calls {@link StepMeterRegistry#pollMetersToRollover()}
* as soon as the step is crossed.
*/
private void addTimeWithRolloverOnStepStart(MockClock clock, StepMeterRegistry registry, StepRegistryConfig config,
Duration timeToAdd) {

long currentTime = clock.wallTime();
long boundaryForNextStep = ((currentTime / config.step().toMillis()) + 1) * config.step().toMillis();
long timeToNextStep = boundaryForNextStep - currentTime;
if (timeToAdd.toMillis() >= timeToNextStep) {
clock.add(timeToNextStep, MILLISECONDS);
registry.pollMetersToRollover();
clock.add((timeToAdd.toMillis() - timeToNextStep), MILLISECONDS);
return;
}
clock.add(timeToAdd);
}

}

0 comments on commit b0be362

Please sign in to comment.