Skip to content

Commit

Permalink
Memory Mode: Adding 3rd and last part support for synchronous instrum…
Browse files Browse the repository at this point in the history
…ents - exponential histogram (#6136)

Co-authored-by: jack-berg <34418638+jack-berg@users.noreply.github.com>
  • Loading branch information
asafm and jack-berg committed Jan 25, 2024
1 parent 0e4986a commit 737dfef
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType.InstrumentTester;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType.TestInstrumentsState;
import java.time.Duration;
import java.util.List;
import java.util.Random;
Expand All @@ -33,8 +34,8 @@
import org.openjdk.jmh.annotations.Warmup;

/**
* Run this through {@link AsynchronousMetricStorageGarbageCollectionBenchmarkTest}, as it runs it
* embedded with the GC profiler which what this test designed for (No need for command line run)
* Run this through {@link InstrumentGarbageCollectionBenchmarkTest}, as it runs it embedded with
* the GC profiler which what this test designed for (No need for command line run)
*
* <p>This test creates 10 asynchronous counters (any asynchronous instrument will do as the code
* path is almost the same for all async instrument types), and 1000 attribute sets. Each time the
Expand All @@ -51,37 +52,41 @@
*/
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Measurement(iterations = 20, batchSize = 100)
@Measurement(iterations = 10, batchSize = 10)
@Warmup(iterations = 10, batchSize = 10)
@Fork(1)
public class AsynchronousMetricStorageGarbageCollectionBenchmark {
public class InstrumentGarbageCollectionBenchmark {

@State(value = Scope.Benchmark)
@SuppressWarnings("SystemOut")
public static class ThreadState {
private final int cardinality;
private final int countersCount;
private final int instrumentCount;
@Param public TestInstrumentType testInstrumentType;
@Param public AggregationTemporality aggregationTemporality;
@Param public MemoryMode memoryMode;
SdkMeterProvider sdkMeterProvider;
private final Random random = new Random();
List<Attributes> attributesList;
private TestInstrumentsState testInstrumentsState;
private InstrumentTester instrumentTester;

/** Creates a ThreadState. */
@SuppressWarnings("unused")
public ThreadState() {
cardinality = 1000;
countersCount = 10;
instrumentCount = 10;
}

@SuppressWarnings("SpellCheckingInspection")
@Setup
public void setup() {
instrumentTester = testInstrumentType.createInstrumentTester();
PeriodicMetricReader metricReader =
PeriodicMetricReader.builder(
// Configure an exporter that configures the temporality and aggregation
// for the test case, but otherwise drops the data on export
new NoopMetricExporter(aggregationTemporality, Aggregation.sum(), memoryMode))
new NoopMetricExporter(
aggregationTemporality, instrumentTester.testedAggregation(), memoryMode))
// Effectively disable periodic reading so reading is only done on #flush()
.setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
.build();
Expand All @@ -95,18 +100,9 @@ public void setup() {
SdkMeterProviderUtil.setExemplarFilter(builder, ExemplarFilter.alwaysOff());

sdkMeterProvider = builder.build();
for (int i = 0; i < countersCount; i++) {
sdkMeterProvider
.get("meter")
.counterBuilder("counter" + i)
.buildWithCallback(
observableLongMeasurement -> {
for (int j = 0; j < attributesList.size(); j++) {
Attributes attributes = attributesList.get(j);
observableLongMeasurement.record(random.nextInt(10_000), attributes);
}
});
}
testInstrumentsState =
instrumentTester.buildInstruments(
instrumentCount, sdkMeterProvider, attributesList, random);
}

@TearDown
Expand All @@ -123,6 +119,8 @@ public void tearDown() {
@Benchmark
@Threads(value = 1)
public void recordAndCollect(ThreadState threadState) {
threadState.instrumentTester.recordValuesInInstruments(
threadState.testInstrumentsState, threadState.attributesList, threadState.random);
threadState.sdkMeterProvider.forceFlush().join(10, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

public class AsynchronousMetricStorageGarbageCollectionBenchmarkTest {
public class InstrumentGarbageCollectionBenchmarkTest {

/**
* This test validates that in {@link MemoryMode#REUSABLE_DATA}, {@link
* AsynchronousMetricStorage#collect(Resource, InstrumentationScopeInfo, long, long)} barely
* allocates memory which is then subsequently garbage collected. It is done so comparatively to
* {@link MemoryMode#IMMUTABLE_DATA},
* This test validates that in {@link MemoryMode#REUSABLE_DATA}, any {@link
* MetricStorage#collect(Resource, InstrumentationScopeInfo, long, long)} barely allocates memory
* which is then subsequently garbage collected. It is done so comparatively to {@link
* MemoryMode#IMMUTABLE_DATA},
*
* <p>It runs the JMH test {@link AsynchronousMetricStorageGarbageCollectionBenchmark} with GC
* profiler, and measures for each parameter combination the garbage collector normalized rate
* (bytes allocated per Operation).
* <p>It runs the JMH test {@link InstrumentGarbageCollectionBenchmark} with GC profiler, and
* measures for each parameter combination the garbage collector normalized rate (bytes allocated
* per Operation).
*
* <p>Memory allocations can be hidden even at an innocent foreach loop on a collection, which
* under the hood allocates an internal object O(N) times. Someone can accidentally refactor such
Expand All @@ -52,55 +52,76 @@ public void normalizedAllocationRateTest() throws RunnerException {
"true".equals(System.getenv("CI")),
"This test should only run in GitHub CI since it's long");

// Runs AsynchronousMetricStorageMemoryProfilingBenchmark
// Runs InstrumentGarbageCollectionBenchmark
// with garbage collection profiler
Options opt =
new OptionsBuilder()
.include(AsynchronousMetricStorageGarbageCollectionBenchmark.class.getSimpleName())
.include(InstrumentGarbageCollectionBenchmark.class.getSimpleName())
.addProfiler("gc")
.shouldFailOnError(true)
.jvmArgs("-Xmx1500m")
.build();
Collection<RunResult> results = new Runner(opt).run();

// Collect the normalized GC allocation rate per parameters combination
Map<String, Map<String, Double>> resultMap = new HashMap<>();
Map<String, TestInstrumentTypeResults> testInstrumentTypeResultsMap = new HashMap<>();
for (RunResult result : results) {
for (BenchmarkResult benchmarkResult : result.getBenchmarkResults()) {
BenchmarkParams benchmarkParams = benchmarkResult.getParams();

String memoryMode = benchmarkParams.getParam("memoryMode");
String aggregationTemporality = benchmarkParams.getParam("aggregationTemporality");
String testInstrumentType = benchmarkParams.getParam("testInstrumentType");
assertThat(memoryMode).isNotNull();
assertThat(aggregationTemporality).isNotNull();
assertThat(testInstrumentType).isNotNull();

Map<String, Result> secondaryResults = benchmarkResult.getSecondaryResults();
Result allocRateNorm = secondaryResults.get("gc.alloc.rate.norm");
assertThat(allocRateNorm)
.describedAs("Allocation rate in secondary results: %s", secondaryResults)
.isNotNull();

resultMap
testInstrumentTypeResultsMap
.computeIfAbsent(testInstrumentType, k -> new TestInstrumentTypeResults())
.aggregationTemporalityToMemoryModeResult
.computeIfAbsent(aggregationTemporality, k -> new HashMap<>())
.put(memoryMode, allocRateNorm.getScore());
}
}

assertThat(resultMap).hasSameSizeAs(AggregationTemporality.values());
testInstrumentTypeResultsMap.forEach(
(testInstrumentType, testInstrumentTypeResults) -> {
Map<String, Map<String, Double>> resultMap =
testInstrumentTypeResults.aggregationTemporalityToMemoryModeResult;
assertThat(resultMap).hasSameSizeAs(AggregationTemporality.values());

// Asserts that reusable data GC allocation rate is a tiny fraction of immutable data
// GC allocation rate
resultMap.forEach(
(aggregationTemporality, memoryModeToAllocRateMap) -> {
Double immutableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.IMMUTABLE_DATA.toString());
Double reusableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.REUSABLE_DATA.toString());
// Asserts that reusable data GC allocation rate is a tiny fraction of immutable data
// GC allocation rate
resultMap.forEach(
(aggregationTemporality, memoryModeToAllocRateMap) -> {
Double immutableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.IMMUTABLE_DATA.toString());
Double reusableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.REUSABLE_DATA.toString());

assertThat(immutableDataAllocRate).isNotNull().isNotZero();
assertThat(reusableDataAllocRate).isNotNull().isNotZero();
assertThat(100 - (reusableDataAllocRate / immutableDataAllocRate) * 100)
.isCloseTo(99.8, Offset.offset(2.0));
assertThat(immutableDataAllocRate).isNotNull().isNotZero();
assertThat(reusableDataAllocRate).isNotNull().isNotZero();

// If this test suddenly fails for you this means you have changed the code in a way
// that allocates more memory than before. You can find out where, by running
// ProfileBenchmark class and looking at the flame graph. Make sure to
// set the parameters according to where it failed for.
assertThat(100 - (reusableDataAllocRate / immutableDataAllocRate) * 100)
.describedAs(
"Aggregation temporality = %s, testInstrumentType = %s",
aggregationTemporality, testInstrumentType)
.isCloseTo(99.8, Offset.offset(2.0));
});
});
}

static class TestInstrumentTypeResults {
Map<String, Map<String, Double>> aggregationTemporalityToMemoryModeResult = new HashMap<>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state;

import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;

/**
* This benchmark class is used to see memory allocation flame graphs for a single run.
*
* <p>Steps:
*
* <ol>
* <li>Follow download instructions for async-profiler, located at <a
* href="https://github.com/async-profiler/async-profiler">this location</a>
* <li>Assuming you have extracted it at /tmp/async-profiler-2.9-macos, add the following to your
* JVM arguments of your run configuration:
* <pre>
* -agentpath:/tmp/async-profiler-2.9-macos/build/libasyncProfiler.so=start,event=alloc,flamegraph,file=/tmp/profiled_data.html
* </pre>
* <li>Tune the parameters as you see fit (They are marked below with "Parameters")
* <li>Run the class (its main function)
* <li>Open /tmp/profiled_data.html with your browser
* <li>Use the flame graph to see where the allocations are happening the most and fix
* <li>Run {@link InstrumentGarbageCollectionBenchmark} and see if it passes now
* <li>If not, repeat
* </ol>
*/
public class ProfileBenchmark {

private ProfileBenchmark() {}

public static void main(String[] args) {
// Parameters
AggregationTemporality aggregationTemporality = AggregationTemporality.DELTA;
MemoryMode memoryMode = MemoryMode.REUSABLE_DATA;
TestInstrumentType testInstrumentType = TestInstrumentType.EXPONENTIAL_HISTOGRAM;

InstrumentGarbageCollectionBenchmark.ThreadState benchmarkSetup =
new InstrumentGarbageCollectionBenchmark.ThreadState();

benchmarkSetup.aggregationTemporality = aggregationTemporality;
benchmarkSetup.memoryMode = memoryMode;
benchmarkSetup.testInstrumentType = testInstrumentType;

InstrumentGarbageCollectionBenchmark benchmark = new InstrumentGarbageCollectionBenchmark();

benchmarkSetup.setup();

warmup(benchmark, benchmarkSetup);

// This is divided explicitly to two methods so you can focus on `measure` in the flame graph
// when trying to decrease the allocations
measure(benchmark, benchmarkSetup);
}

public static void warmup(
InstrumentGarbageCollectionBenchmark benchmark,
InstrumentGarbageCollectionBenchmark.ThreadState benchmarkSetup) {
for (int i = 0; i < 10; i++) {
benchmark.recordAndCollect(benchmarkSetup);
}
}

public static void measure(
InstrumentGarbageCollectionBenchmark benchmark,
InstrumentGarbageCollectionBenchmark.ThreadState benchmarkSetup) {
for (int i = 0; i < 200; i++) {
benchmark.recordAndCollect(benchmarkSetup);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.tester.AsyncCounterTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExponentialHistogramTester;
import java.util.List;
import java.util.Random;

public enum TestInstrumentType {
ASYNC_COUNTER() {
@Override
InstrumentTester createInstrumentTester() {
return new AsyncCounterTester();
}
},
EXPONENTIAL_HISTOGRAM() {
@Override
InstrumentTester createInstrumentTester() {
return new ExponentialHistogramTester();
}
};

abstract InstrumentTester createInstrumentTester();

TestInstrumentType() {}

public interface InstrumentTester {
Aggregation testedAggregation();

TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random);

void recordValuesInInstruments(
TestInstrumentsState testInstrumentsState, List<Attributes> attributesList, Random random);
}

public interface TestInstrumentsState {}

public static class EmptyInstrumentsState implements TestInstrumentsState {}
}
Loading

0 comments on commit 737dfef

Please sign in to comment.