Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Mode: Adding 3rd and last part support for synchronous instruments - exponential histogram #6136

Merged
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0c001b8
First part of synchronous support for memory mode.
asafm Nov 19, 2023
e986511
lint + improved error message
asafm Nov 22, 2023
3672a23
Update sdk/common/src/main/java/io/opentelemetry/sdk/common/export/Me…
asafm Nov 22, 2023
8a0330e
Update sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewBui…
asafm Nov 22, 2023
9fbb0b7
PR fixes 1
asafm Nov 22, 2023
fb8358d
Merge remote-tracking branch 'origin/memory-mode-sync-instruments-par…
asafm Nov 22, 2023
e49c5b4
PR fixes 2
asafm Nov 22, 2023
3444c2c
PR fixes 3
asafm Nov 26, 2023
35cdf26
All seems good. After checking it several times
asafm Dec 7, 2023
1ef3ca8
Check gradle task fixes
asafm Dec 7, 2023
8fda642
Merge remote-tracking branch 'upstream/main' into memory-mode-sync-in…
asafm Dec 20, 2023
a021020
Added missing tests to complete the code coverage, where it made sense
asafm Dec 24, 2023
411f73a
Fixes build fails
asafm Dec 24, 2023
55c7ac4
PR fixes - first part
asafm Jan 7, 2024
bb074f4
Merge remote-tracking branch 'upstream/main' into memory-mode-sync-in…
asafm Jan 7, 2024
010022c
PR fixes - second part
asafm Jan 7, 2024
6327f9e
Linter fixed and 1 bug fix
asafm Jan 8, 2024
ccfce83
More PR fixes
asafm Jan 9, 2024
95690f1
Added sanity unit testing 2 data classes just to pass code coverage
asafm Jan 9, 2024
6a124c5
Linter fixes
asafm Jan 9, 2024
9b0d48e
Added more tests for code coverage
asafm Jan 9, 2024
c236048
(Last) part-3 of adding memory mode support for Synchronous instrumen…
asafm Jan 10, 2024
c49d2f9
Merge remote-tracking branch 'upstream/main' into memory-mode-sync-in…
asafm Jan 14, 2024
1802396
Fixed last PR comment.
asafm Jan 21, 2024
82337b5
Tiny fix
asafm Jan 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType.InstrumentTester;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType.TestInstrumentsState;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.List;
import java.util.Random;
Expand All @@ -33,8 +35,8 @@
import org.openjdk.jmh.annotations.Warmup;

/**
* Run this through {@link AsynchronousMetricStorageGarbageCollectionBenchmarkTest}, as it runs it
* embedded with the GC profiler which what this test designed for (No need for command line run)
* Run this through {@link InstrumentGarbageCollectionBenchmarkTest}, as it runs it embedded with
* the GC profiler which what this test designed for (No need for command line run)
*
* <p>This test creates 10 asynchronous counters (any asynchronous instrument will do as the code
* path is almost the same for all async instrument types), and 1000 attribute sets. Each time the
Expand All @@ -51,37 +53,46 @@
*/
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Measurement(iterations = 20, batchSize = 100)
@Measurement(iterations = 10, batchSize = 10)
@Warmup(iterations = 10, batchSize = 10)
@Fork(1)
public class AsynchronousMetricStorageGarbageCollectionBenchmark {
public class InstrumentGarbageCollectionBenchmark {

@State(value = Scope.Benchmark)
@SuppressWarnings("SystemOut")
public static class ThreadState {
private final int cardinality;
private final int countersCount;
private final int instrumentCount;
@Param public TestInstrumentType testInstrumentType;
@Param public AggregationTemporality aggregationTemporality;
@Param public MemoryMode memoryMode;
SdkMeterProvider sdkMeterProvider;
private final Random random = new Random();
List<Attributes> attributesList;
private TestInstrumentsState testInstrumentsState;
private InstrumentTester instrumentTester;

/** Creates a ThreadState. */
@SuppressWarnings("unused")
public ThreadState() {
cardinality = 1000;
countersCount = 10;
instrumentCount = 10;
}

@SuppressWarnings("SpellCheckingInspection")
@Setup
public void setup() {
public void setup()
throws NoSuchMethodException,
InvocationTargetException,
InstantiationException,
IllegalAccessException {
instrumentTester =
testInstrumentType.instrumentTesterClass.getDeclaredConstructor().newInstance();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#nit: If you change instrumentTesterClass from Class<? extends InstrumentTester> to Supplier<InstrumentTester>, and configure each element in the TestInstrumentType enum with a factory method for creating new instances of InstrumentTester:

public enum TestInstrumentType {
  ASYNC_COUNTER(AsyncCounterTester::new),
  EXPONENTIAL_HISTOGRAM(ExponentialHistogramTester::new);

  final Supplier<InstrumentTester> instrumentTesterSupplier;

  TestInstrumentType(Supplier<InstrumentTester> instrumentTesterSupplier) {
    this.instrumentTesterSupplier = instrumentTesterSupplier;
  }
 
  ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Much better.
I did it with an abstract method since error prone recommended it, and it does seem cleaner.

I also added the ProfileBenchmark and linked to it from the test

PeriodicMetricReader metricReader =
PeriodicMetricReader.builder(
// Configure an exporter that configures the temporality and aggregation
// for the test case, but otherwise drops the data on export
new NoopMetricExporter(aggregationTemporality, Aggregation.sum(), memoryMode))
new NoopMetricExporter(
aggregationTemporality, instrumentTester.testedAggregation(), memoryMode))
// Effectively disable periodic reading so reading is only done on #flush()
.setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
.build();
Expand All @@ -95,18 +106,9 @@ public void setup() {
SdkMeterProviderUtil.setExemplarFilter(builder, ExemplarFilter.alwaysOff());

sdkMeterProvider = builder.build();
for (int i = 0; i < countersCount; i++) {
sdkMeterProvider
.get("meter")
.counterBuilder("counter" + i)
.buildWithCallback(
observableLongMeasurement -> {
for (int j = 0; j < attributesList.size(); j++) {
Attributes attributes = attributesList.get(j);
observableLongMeasurement.record(random.nextInt(10_000), attributes);
}
});
}
testInstrumentsState =
instrumentTester.buildInstruments(
instrumentCount, sdkMeterProvider, attributesList, random);
}

@TearDown
Expand All @@ -123,6 +125,8 @@ public void tearDown() {
@Benchmark
@Threads(value = 1)
public void recordAndCollect(ThreadState threadState) {
threadState.instrumentTester.recordValuesInInstruments(
threadState.testInstrumentsState, threadState.attributesList, threadState.random);
threadState.sdkMeterProvider.forceFlush().join(10, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

public class AsynchronousMetricStorageGarbageCollectionBenchmarkTest {
public class InstrumentGarbageCollectionBenchmarkTest {

/**
* This test validates that in {@link MemoryMode#REUSABLE_DATA}, {@link
* AsynchronousMetricStorage#collect(Resource, InstrumentationScopeInfo, long, long)} barely
* allocates memory which is then subsequently garbage collected. It is done so comparatively to
* {@link MemoryMode#IMMUTABLE_DATA},
* This test validates that in {@link MemoryMode#REUSABLE_DATA}, any {@link
* MetricStorage#collect(Resource, InstrumentationScopeInfo, long, long)} barely allocates memory
* which is then subsequently garbage collected. It is done so comparatively to {@link
* MemoryMode#IMMUTABLE_DATA},
*
* <p>It runs the JMH test {@link AsynchronousMetricStorageGarbageCollectionBenchmark} with GC
* profiler, and measures for each parameter combination the garbage collector normalized rate
* (bytes allocated per Operation).
* <p>It runs the JMH test {@link InstrumentGarbageCollectionBenchmark} with GC profiler, and
* measures for each parameter combination the garbage collector normalized rate (bytes allocated
* per Operation).
*
* <p>Memory allocations can be hidden even at an innocent foreach loop on a collection, which
* under the hood allocates an internal object O(N) times. Someone can accidentally refactor such
Expand All @@ -52,55 +52,71 @@ public void normalizedAllocationRateTest() throws RunnerException {
"true".equals(System.getenv("CI")),
"This test should only run in GitHub CI since it's long");

// Runs AsynchronousMetricStorageMemoryProfilingBenchmark
// Runs InstrumentGarbageCollectionBenchmark
// with garbage collection profiler
Options opt =
new OptionsBuilder()
.include(AsynchronousMetricStorageGarbageCollectionBenchmark.class.getSimpleName())
.include(InstrumentGarbageCollectionBenchmark.class.getSimpleName())
.addProfiler("gc")
.shouldFailOnError(true)
.jvmArgs("-Xmx1500m")
.build();
Collection<RunResult> results = new Runner(opt).run();

// Collect the normalized GC allocation rate per parameters combination
Map<String, Map<String, Double>> resultMap = new HashMap<>();
Map<String, TestInstrumentTypeResults> testInstrumentTypeResultsMap = new HashMap<>();
for (RunResult result : results) {
for (BenchmarkResult benchmarkResult : result.getBenchmarkResults()) {
BenchmarkParams benchmarkParams = benchmarkResult.getParams();

String memoryMode = benchmarkParams.getParam("memoryMode");
String aggregationTemporality = benchmarkParams.getParam("aggregationTemporality");
String testInstrumentType = benchmarkParams.getParam("testInstrumentType");
assertThat(memoryMode).isNotNull();
assertThat(aggregationTemporality).isNotNull();
assertThat(testInstrumentType).isNotNull();

Map<String, Result> secondaryResults = benchmarkResult.getSecondaryResults();
Result allocRateNorm = secondaryResults.get("gc.alloc.rate.norm");
assertThat(allocRateNorm)
.describedAs("Allocation rate in secondary results: %s", secondaryResults)
.isNotNull();

resultMap
testInstrumentTypeResultsMap
.computeIfAbsent(testInstrumentType, k -> new TestInstrumentTypeResults())
.aggregationTemporalityToMemoryModeResult
.computeIfAbsent(aggregationTemporality, k -> new HashMap<>())
.put(memoryMode, allocRateNorm.getScore());
}
}

assertThat(resultMap).hasSameSizeAs(AggregationTemporality.values());
testInstrumentTypeResultsMap.forEach(
(testInstrumentType, testInstrumentTypeResults) -> {
Map<String, Map<String, Double>> resultMap =
testInstrumentTypeResults.aggregationTemporalityToMemoryModeResult;
assertThat(resultMap).hasSameSizeAs(AggregationTemporality.values());

// Asserts that reusable data GC allocation rate is a tiny fraction of immutable data
// GC allocation rate
resultMap.forEach(
(aggregationTemporality, memoryModeToAllocRateMap) -> {
Double immutableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.IMMUTABLE_DATA.toString());
Double reusableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.REUSABLE_DATA.toString());
// Asserts that reusable data GC allocation rate is a tiny fraction of immutable data
// GC allocation rate
resultMap.forEach(
(aggregationTemporality, memoryModeToAllocRateMap) -> {
Double immutableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.IMMUTABLE_DATA.toString());
Double reusableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.REUSABLE_DATA.toString());

assertThat(immutableDataAllocRate).isNotNull().isNotZero();
assertThat(reusableDataAllocRate).isNotNull().isNotZero();
assertThat(100 - (reusableDataAllocRate / immutableDataAllocRate) * 100)
.isCloseTo(99.8, Offset.offset(2.0));
assertThat(immutableDataAllocRate).isNotNull().isNotZero();
assertThat(reusableDataAllocRate).isNotNull().isNotZero();
assertThat(100 - (reusableDataAllocRate / immutableDataAllocRate) * 100)
.describedAs(
"Aggregation temporality = %s, testInstrumentType = %s",
aggregationTemporality, testInstrumentType)
.isCloseTo(99.8, Offset.offset(2.0));
});
});
}

static class TestInstrumentTypeResults {
Map<String, Map<String, Double>> aggregationTemporalityToMemoryModeResult = new HashMap<>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.tester.AsyncCounterTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExponentialHistogramTester;
import java.util.List;
import java.util.Random;

public enum TestInstrumentType {
ASYNC_COUNTER(AsyncCounterTester.class),
EXPONENTIAL_HISTOGRAM(ExponentialHistogramTester.class);

final Class<? extends InstrumentTester> instrumentTesterClass;

TestInstrumentType(Class<? extends InstrumentTester> instrumentTesterClass) {
this.instrumentTesterClass = instrumentTesterClass;
}

public interface InstrumentTester {
Aggregation testedAggregation();

TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random);

void recordValuesInInstruments(
TestInstrumentsState testInstrumentsState, List<Attributes> attributesList, Random random);
}

public interface TestInstrumentsState {}

public static class EmptyInstrumentsState implements TestInstrumentsState {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state.tester;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType.EmptyInstrumentsState;
import java.util.List;
import java.util.Random;

public class AsyncCounterTester implements TestInstrumentType.InstrumentTester {
@Override
public Aggregation testedAggregation() {
return Aggregation.sum();
}

@SuppressWarnings("ForLoopReplaceableByForEach") // This is for GC sensitivity testing: no streams
@Override
public TestInstrumentType.TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random) {
for (int i = 0; i < instrumentCount; i++) {
sdkMeterProvider
.get("meter")
.counterBuilder("counter" + i)
.buildWithCallback(
observableLongMeasurement -> {
for (int j = 0; j < attributesList.size(); j++) {
Attributes attributes = attributesList.get(j);
observableLongMeasurement.record(random.nextInt(10_000), attributes);
}
});
}
return new EmptyInstrumentsState();
}

@Override
public void recordValuesInInstruments(
TestInstrumentType.TestInstrumentsState testInstrumentsState,
List<Attributes> attributesList,
Random random) {
// No need, all done via the callbacks
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state.tester;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType.InstrumentTester;
import java.util.List;
import java.util.Random;

public class ExponentialHistogramTester implements InstrumentTester {

static class ExponentialHistogramState implements TestInstrumentType.TestInstrumentsState {
DoubleHistogram doubleHistogram;
}

private static final int measurementsPerAttributeSet = 1_000;

@Override
public Aggregation testedAggregation() {
return Aggregation.base2ExponentialBucketHistogram();
}

@Override
public TestInstrumentType.TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random) {
ExponentialHistogramState state = new ExponentialHistogramState();
state.doubleHistogram = sdkMeterProvider.get("meter").histogramBuilder("testhistogram").build();
return state;
}

@SuppressWarnings("ForLoopReplaceableByForEach") // This is for GC sensitivity testing: no streams
@Override
public void recordValuesInInstruments(
TestInstrumentType.TestInstrumentsState testInstrumentsState,
List<Attributes> attributesList,
Random random) {

ExponentialHistogramState state = (ExponentialHistogramState) testInstrumentsState;

for (int j = 0; j < attributesList.size(); j++) {
Attributes attributes = attributesList.get(j);
for (int i = 0; i < measurementsPerAttributeSet; i++) {
state.doubleHistogram.record(random.nextInt(10_000), attributes);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private Base2ExponentialHistogramIndexer(int scale) {

/** Get an indexer for the given scale. Indexers are cached and reused for performance. */
static Base2ExponentialHistogramIndexer get(int scale) {
return cache.computeIfAbsent(scale, unused -> new Base2ExponentialHistogramIndexer(scale));
return cache.computeIfAbsent(scale, Base2ExponentialHistogramIndexer::new);
}

/**
Expand Down
Loading