Skip to content

Commit

Permalink
Merge branch '1.11.x' into 1.12.x
Browse files Browse the repository at this point in the history
  • Loading branch information
shakuzen committed Dec 11, 2023
2 parents fb0fc64 + 2735d26 commit b2250c5
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public class OtlpMeterRegistry extends PushMeterRegistry {

private long deltaAggregationTimeUnixNano = 0L;

// Time when the last scheduled rollOver has started. Applicable only for delta
// flavour.
private long lastMeterRolloverStartTime = -1;

@Nullable
private ScheduledExecutorService meterPollingService;

Expand Down Expand Up @@ -247,8 +251,8 @@ protected DistributionStatisticConfig defaultHistogramConfig() {
public void close() {
stop();
if (config.enabled() && isDelta() && !isClosed()) {
if (!isDataPublishedForCurrentStep() && !isPublishing()) {
// Data was not published for the current step. So, we should flush that
if (shouldPublishDataForLastStep() && !isPublishing()) {
// Data was not published for the last step. So, we should flush that
// first.
try {
publish();
Expand All @@ -267,9 +271,13 @@ else if (isPublishing()) {
super.close();
}

private boolean isDataPublishedForCurrentStep() {
return (getLastScheduledPublishStartTime() / config.step().toMillis()) == (clock.wallTime()
/ config.step().toMillis());
private boolean shouldPublishDataForLastStep() {
if (lastMeterRolloverStartTime < 0)
return false;

final long lastPublishedStep = getLastScheduledPublishStartTime() / config.step().toMillis();
final long lastPolledStep = lastMeterRolloverStartTime / config.step().toMillis();
return lastPublishedStep < lastPolledStep;
}

// Either we do this or make StepMeter public
Expand Down Expand Up @@ -346,6 +354,7 @@ private Metric writeSum(Meter meter, DoubleSupplier count) {
*/
// VisibleForTesting
void pollMetersToRollover() {
this.lastMeterRolloverStartTime = clock.wallTime();
this.getMeters()
.forEach(m -> m.match(gauge -> null, Counter::count, Timer::takeSnapshot, DistributionSummary::takeSnapshot,
meter -> null, meter -> null, FunctionCounter::count, FunctionTimer::count, meter -> null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,20 @@ void whenCloseDuringScheduledPublish_thenPreviousStepAndCurrentPartialStepArePub
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24);
}

@Test
@Issue("#4357")
void publishOnceWhenClosedWithinFirstStep() {
// Set the initial clock time to a valid time.
MockClock mockClock = new MockClock();
mockClock.add(otlpConfig().step().multipliedBy(5));

TestOtlpMeterRegistry stepMeterRegistry = new TestOtlpMeterRegistry(otlpConfig(), mockClock);

assertThat(stepMeterRegistry.publishCount.get()).isZero();
stepMeterRegistry.close();
assertThat(stepMeterRegistry.publishCount.get()).isEqualTo(1);
}

private void assertEmptyHistogramSnapshot(HistogramSnapshot snapshot) {
assertThat(snapshot.count()).isZero();
assertThat(snapshot.total()).isZero();
Expand Down Expand Up @@ -771,6 +785,8 @@ private void assertHistogramContains(HistogramSnapshot snapshot, double total, d

private class TestOtlpMeterRegistry extends OtlpMeterRegistry {

private final AtomicInteger publishCount = new AtomicInteger();

Deque<Double> publishedCounterCounts = new ArrayDeque<>();

Deque<Long> publishedTimerCounts = new ArrayDeque<>();
Expand All @@ -795,18 +811,24 @@ private class TestOtlpMeterRegistry extends OtlpMeterRegistry {

Deque<Double> publishedFunctionTimerTotals = new ArrayDeque<>();

private long lastScheduledPublishStartTime = 0L;
private long lastScheduledPublishStartTime;

AtomicBoolean isPublishing = new AtomicBoolean(false);

CompletableFuture<Void> scheduledPublishingFuture = CompletableFuture.completedFuture(null);

TestOtlpMeterRegistry() {
super(otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock);
this(otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock);
}

TestOtlpMeterRegistry(OtlpConfig otlpConfig, Clock clock) {
super(otlpConfig, clock);
this.lastScheduledPublishStartTime = super.getLastScheduledPublishStartTime();
}

@Override
protected void publish() {
publishCount.incrementAndGet();
forEachMeter(meter -> meter.match(null, this::publishCounter, this::publishTimer, this::publishSummary,
null, null, this::publishFunctionCounter, this::publishFunctionTimer, null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public abstract class StepMeterRegistry extends PushMeterRegistry {
@Nullable
private ScheduledExecutorService meterPollingService;

// Time when the last scheduled rollOver has started.
private long lastMeterRolloverStartTime = -1;

public StepMeterRegistry(StepRegistryConfig config, Clock clock) {
super(config, clock);
this.config = config;
Expand Down Expand Up @@ -139,9 +142,9 @@ public void close() {
stop();

if (config.enabled() && !isClosed()) {
if (!isDataPublishedForCurrentStep() && !isPublishing()) {
// Data was not published for the current step. So, we should flush that
// first.
if (shouldPublishDataForLastStep() && !isPublishing()) {
// Data was not published for the last completed step. So, we should flush
// that first.
try {
publish();
}
Expand All @@ -159,9 +162,13 @@ else if (isPublishing()) {
super.close();
}

private boolean isDataPublishedForCurrentStep() {
return (getLastScheduledPublishStartTime() / config.step().toMillis()) == (clock.wallTime()
/ config.step().toMillis());
private boolean shouldPublishDataForLastStep() {
if (lastMeterRolloverStartTime < 0)
return false;

final long lastPublishedStep = getLastScheduledPublishStartTime() / config.step().toMillis();
final long lastPolledStep = lastMeterRolloverStartTime / config.step().toMillis();
return lastPublishedStep < lastPolledStep;
}

/**
Expand All @@ -181,6 +188,7 @@ private void closingRolloverStepMeters() {
*/
// VisibleForTesting
void pollMetersToRollover() {
this.lastMeterRolloverStartTime = clock.wallTime();
this.getMeters()
.forEach(m -> m.match(gauge -> null, Counter::count, Timer::count, DistributionSummary::count,
meter -> null, meter -> null, FunctionCounter::count, FunctionTimer::count, meter -> null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
*/
class StepMeterRegistryTest {

private final AtomicInteger publishes = new AtomicInteger();

private final MockClock clock = new MockClock();

private final StepRegistryConfig config = new StepRegistryConfig() {
Expand Down Expand Up @@ -98,9 +96,9 @@ void serviceLevelObjectivesOnlyNoPercentileHistogram() {
@Issue("#484")
@Test
void publishOneLastTimeOnClose() {
assertThat(publishes.get()).isEqualTo(0);
assertThat(registry.publishCount.get()).isZero();
registry.close();
assertThat(publishes.get()).isEqualTo(1);
assertThat(registry.publishCount.get()).isEqualTo(1);
}

@Issue("#1993")
Expand Down Expand Up @@ -425,7 +423,7 @@ void scheduledRollOver() {
}

@Test
@Issue("3914")
@Issue("#3914")
void publishShouldNotHappenWhenRegistryIsDisabled() {
StepRegistryConfig disabledStepRegistryConfig = new StepRegistryConfig() {
@Override
Expand All @@ -449,27 +447,26 @@ public String get(String key) {
Counter.builder("publish_disabled_counter").register(disabledStepMeterRegistry).increment();

clock.add(config.step());
assertThat(publishes.get()).isZero();
assertThat(disabledStepMeterRegistry.publishCount.get()).isZero();
disabledStepMeterRegistry.close();
assertThat(publishes.get()).isZero();
assertThat(disabledStepMeterRegistry.publishCount.get()).isZero();
}

@Test
@Issue("3914")
@Issue("#3914")
void publishShouldNotHappenWhenRegistryIsClosed() {
Counter.builder("my.counter").register(registry).increment();

clock.add(config.step());
assertThat(publishes.get()).isZero();
assertThat(registry.publishCount.get()).isZero();
registry.close();
assertThat(publishes.get()).isEqualTo(2);
assertThat(registry.publishedCounterCounts).hasSize(2);
assertThat(registry.publishedCounterCounts.getFirst()).isOne();
assertThat(registry.publishedCounterCounts.getLast()).isZero();
assertThat(registry.publishCount.get()).isEqualTo(1);
assertThat(registry.publishedCounterCounts).hasSize(1);

clock.add(config.step());
registry.close();
assertThat(publishes.get()).isEqualTo(2);
assertThat(registry.publishCount.get()).isEqualTo(1);
assertThat(registry.publishedCounterCounts).hasSize(1);
}

@Test
Expand Down Expand Up @@ -557,8 +554,23 @@ void whenCloseDuringScheduledPublish_thenPreviousStepAndCurrentPartialStepArePub
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24);
}

@Test
@Issue("#4357")
void publishOnceWhenClosedWithinFirstStep() {
// Set the initial clock time to a valid time.
MockClock mockClock = new MockClock();
mockClock.add(config.step().multipliedBy(5));

MyStepMeterRegistry stepMeterRegistry = new MyStepMeterRegistry(config, mockClock);
assertThat(stepMeterRegistry.publishCount.get()).isZero();
stepMeterRegistry.close();
assertThat(stepMeterRegistry.publishCount.get()).isEqualTo(1);
}

private class MyStepMeterRegistry extends StepMeterRegistry {

private final AtomicInteger publishCount = new AtomicInteger();

Deque<Double> publishedCounterCounts = new ArrayDeque<>();

Deque<Long> publishedTimerCounts = new ArrayDeque<>();
Expand All @@ -575,7 +587,7 @@ private class MyStepMeterRegistry extends StepMeterRegistry {

Deque<Double> publishedFunctionTimerTotals = new ArrayDeque<>();

private long lastScheduledPublishStartTime = 0L;
private long lastScheduledPublishStartTime;

@Nullable
Runnable prePublishAction;
Expand All @@ -590,6 +602,7 @@ private class MyStepMeterRegistry extends StepMeterRegistry {

MyStepMeterRegistry(StepRegistryConfig config, Clock clock) {
super(config, clock);
this.lastScheduledPublishStartTime = super.getLastScheduledPublishStartTime();
}

void setPrePublishAction(Runnable prePublishAction) {
Expand All @@ -601,7 +614,7 @@ protected void publish() {
if (prePublishAction != null) {
prePublishAction.run();
}
publishes.incrementAndGet();
publishCount.incrementAndGet();
getMeters().stream()
.map(meter -> meter.match(g -> null, this::publishCounter, this::publishTimer, this::publishSummary,
null, tg -> null, this::publishFunctionCounter, this::publishFunctionTimer, m -> null))
Expand Down

0 comments on commit b2250c5

Please sign in to comment.