Skip to content

Commit

Permalink
Memory mode: Adding support for synchronous instruments - Last Value …
Browse files Browse the repository at this point in the history
…aggregation (#6196)
  • Loading branch information
asafm committed Feb 8, 2024
1 parent c12bb73 commit c12779d
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static void main(String[] args) {
// Parameters
AggregationTemporality aggregationTemporality = AggregationTemporality.DELTA;
MemoryMode memoryMode = MemoryMode.REUSABLE_DATA;
TestInstrumentType testInstrumentType = TestInstrumentType.EXPLICIT_BUCKET;
TestInstrumentType testInstrumentType = TestInstrumentType.DOUBLE_LAST_VALUE;

InstrumentGarbageCollectionBenchmark.ThreadState benchmarkSetup =
new InstrumentGarbageCollectionBenchmark.ThreadState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.tester.AsyncCounterTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.DoubleLastValueTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.DoubleSumTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExplicitBucketHistogramTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExponentialHistogramTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.LongLastValueTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.LongSumTester;
import java.util.List;
import java.util.Random;
Expand All @@ -23,7 +25,9 @@ public enum TestInstrumentType {
EXPONENTIAL_HISTOGRAM(ExponentialHistogramTester::new),
EXPLICIT_BUCKET(ExplicitBucketHistogramTester::new),
LONG_SUM(LongSumTester::new, /* dataAllocRateReductionPercentage= */ 97.3f),
DOUBLE_SUM(DoubleSumTester::new, /* dataAllocRateReductionPercentage= */ 97.3f);
DOUBLE_SUM(DoubleSumTester::new, /* dataAllocRateReductionPercentage= */ 97.3f),
LONG_LAST_VALUE(LongLastValueTester::new, /* dataAllocRateReductionPercentage= */ 97.3f),
DOUBLE_LAST_VALUE(DoubleLastValueTester::new, /* dataAllocRateReductionPercentage= */ 97.3f);

private final Supplier<? extends InstrumentTester> instrumentTesterInitializer;
private final float dataAllocRateReductionPercentage;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state.tester;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType;
import java.util.List;
import java.util.Random;

public class DoubleLastValueTester implements TestInstrumentType.InstrumentTester {

@Override
public Aggregation testedAggregation() {
return Aggregation.lastValue();
}

@SuppressWarnings("ForLoopReplaceableByForEach") // This is for GC sensitivity testing: no streams
@Override
public TestInstrumentType.TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random) {
Meter meter = sdkMeterProvider.meterBuilder("meter").build();
meter
.gaugeBuilder("test.double.last.value")
.buildWithCallback(
observableDoubleMeasurement -> {
for (int j = 0; j < attributesList.size(); j++) {
observableDoubleMeasurement.record(1.2f, attributesList.get(j));
}
});

return new TestInstrumentType.EmptyInstrumentsState();
}

@Override
public void recordValuesInInstruments(
TestInstrumentType.TestInstrumentsState testInstrumentsState,
List<Attributes> attributesList,
Random random) {
// Recording is done by the callback define above
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state.tester;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType;
import java.util.List;
import java.util.Random;

public class LongLastValueTester implements TestInstrumentType.InstrumentTester {

@Override
public Aggregation testedAggregation() {
return Aggregation.lastValue();
}

@SuppressWarnings({"ForLoopReplaceableByForEach", "resource"})
@Override
public TestInstrumentType.TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random) {
Meter meter = sdkMeterProvider.meterBuilder("meter").build();
meter
.gaugeBuilder("test.long.last.value")
.ofLongs()
.buildWithCallback(
observableLongMeasurement -> {
for (int j = 0; j < attributesList.size(); j++) {
observableLongMeasurement.record(1, attributesList.get(j));
}
});

return new TestInstrumentType.EmptyInstrumentsState();
}

@Override
public void recordValuesInInstruments(
TestInstrumentType.TestInstrumentsState testInstrumentsState,
List<Attributes> attributesList,
Random random) {
// Recording is done by the callback define above
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
Expand Down Expand Up @@ -42,15 +43,17 @@
public final class DoubleLastValueAggregator
implements Aggregator<DoublePointData, DoubleExemplarData> {
private final Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier;
private final MemoryMode memoryMode;

public DoubleLastValueAggregator(
Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier) {
Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier, MemoryMode memoryMode) {
this.reservoirSupplier = reservoirSupplier;
this.memoryMode = memoryMode;
}

@Override
public AggregatorHandle<DoublePointData, DoubleExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
return new Handle(reservoirSupplier.get(), memoryMode);
}

@Override
Expand Down Expand Up @@ -114,8 +117,16 @@ static final class Handle extends AggregatorHandle<DoublePointData, DoubleExempl
@Nullable private static final Double DEFAULT_VALUE = null;
private final AtomicReference<Double> current = new AtomicReference<>(DEFAULT_VALUE);

private Handle(ExemplarReservoir<DoubleExemplarData> reservoir) {
// Only used when memoryMode is REUSABLE_DATA
@Nullable private final MutableDoublePointData reusablePoint;

private Handle(ExemplarReservoir<DoubleExemplarData> reservoir, MemoryMode memoryMode) {
super(reservoir);
if (memoryMode == MemoryMode.REUSABLE_DATA) {
reusablePoint = new MutableDoublePointData();
} else {
reusablePoint = null;
}
}

@Override
Expand All @@ -126,8 +137,14 @@ protected DoublePointData doAggregateThenMaybeReset(
List<DoubleExemplarData> exemplars,
boolean reset) {
Double value = reset ? this.current.getAndSet(DEFAULT_VALUE) : this.current.get();
return ImmutableDoublePointData.create(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
if (reusablePoint != null) {
reusablePoint.set(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
return reusablePoint;
} else {
return ImmutableDoublePointData.create(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
Expand Down Expand Up @@ -39,14 +40,17 @@
*/
public final class LongLastValueAggregator implements Aggregator<LongPointData, LongExemplarData> {
private final Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier;
private final MemoryMode memoryMode;

public LongLastValueAggregator(Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier) {
public LongLastValueAggregator(
Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier, MemoryMode memoryMode) {
this.reservoirSupplier = reservoirSupplier;
this.memoryMode = memoryMode;
}

@Override
public AggregatorHandle<LongPointData, LongExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
return new Handle(reservoirSupplier.get(), memoryMode);
}

@Override
Expand Down Expand Up @@ -109,8 +113,16 @@ static final class Handle extends AggregatorHandle<LongPointData, LongExemplarDa
@Nullable private static final Long DEFAULT_VALUE = null;
private final AtomicReference<Long> current = new AtomicReference<>(DEFAULT_VALUE);

Handle(ExemplarReservoir<LongExemplarData> exemplarReservoir) {
// Only used when memoryMode is REUSABLE_DATA
@Nullable private final MutableLongPointData reusablePoint;

Handle(ExemplarReservoir<LongExemplarData> exemplarReservoir, MemoryMode memoryMode) {
super(exemplarReservoir);
if (memoryMode == MemoryMode.REUSABLE_DATA) {
reusablePoint = new MutableLongPointData();
} else {
reusablePoint = null;
}
}

@Override
Expand All @@ -121,8 +133,15 @@ protected LongPointData doAggregateThenMaybeReset(
List<LongExemplarData> exemplars,
boolean reset) {
Long value = reset ? this.current.getAndSet(DEFAULT_VALUE) : this.current.get();
return ImmutableLongPointData.create(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);

if (reusablePoint != null) {
reusablePoint.set(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
return reusablePoint;
} else {
return ImmutableLongPointData.create(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ public <T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggr
// For the initial version we do not sample exemplars on gauges.
switch (instrumentDescriptor.getValueType()) {
case LONG:
return (Aggregator<T, U>) new LongLastValueAggregator(ExemplarReservoir::longNoSamples);
return (Aggregator<T, U>)
new LongLastValueAggregator(ExemplarReservoir::longNoSamples, memoryMode);
case DOUBLE:
return (Aggregator<T, U>) new DoubleLastValueAggregator(ExemplarReservoir::doubleNoSamples);
return (Aggregator<T, U>)
new DoubleLastValueAggregator(ExemplarReservoir::doubleNoSamples, memoryMode);
}
throw new IllegalArgumentException("Invalid instrument value type");
}
Expand Down
Loading

0 comments on commit c12779d

Please sign in to comment.