Skip to content

Commit

Permalink
[#11052] Fix Caffeine Executor to not use jdk common pool
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed May 23, 2024
1 parent 03cc842 commit aebbbc3
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.navercorp.pinpoint.profiler.cache;

import com.github.benmanes.caffeine.cache.Caffeine;

public class CaffeineBuilder {

Check warning on line 5 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/cache/CaffeineBuilder.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/cache/CaffeineBuilder.java#L5

Added line #L5 was not covered by tests

static final ExecutorManager MANAGER = new ExecutorManager();

public static final int MAX_CPU = 4;


public static Caffeine<Object, Object> newBuilder() {
final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
cacheBuilder.executor(MANAGER.executor());
return cacheBuilder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.navercorp.pinpoint.profiler.cache;

import com.navercorp.pinpoint.common.annotations.VisibleForTesting;
import com.navercorp.pinpoint.common.profiler.concurrent.ExecutorFactory;
import com.navercorp.pinpoint.common.util.CpuUtils;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class ExecutorManager {

private volatile ExecutorService executor;

@VisibleForTesting
static ExecutorService cacheExecutor() {
// TODO Cleanup the executor when the agent is shutdown
final int nThreads = cpuCount(CpuUtils.cpuCount());
// Must set the same property as ForkJoinPool.commonPool()
return ExecutorFactory.newFixedThreadPool(nThreads, Integer.MAX_VALUE, "Caffeine", true);
}

@VisibleForTesting
static int cpuCount(int cpuCount) {
cpuCount = cpuCount - 1;
if (cpuCount <= 0) {
return 1;
}
return Math.min(cpuCount, CaffeineBuilder.MAX_CPU);
}

@VisibleForTesting
public Executor executor() {
if (executor != null) {
return executor;
}
synchronized (this) {
if (executor != null) {
return executor;

Check warning on line 39 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/cache/ExecutorManager.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/cache/ExecutorManager.java#L39

Added line #L39 was not covered by tests
}
executor = cacheExecutor();
return executor;
}
}

@VisibleForTesting
void shutdown() {
if (executor == null) {
return;
}
synchronized (this) {
if (executor == null) {
return;

Check warning on line 53 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/cache/ExecutorManager.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/cache/ExecutorManager.java#L53

Added line #L53 was not covered by tests
}
executor.shutdown();
try {
executor.awaitTermination(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {

Check warning on line 58 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/cache/ExecutorManager.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/cache/ExecutorManager.java#L58

Added line #L58 was not covered by tests
}
executor = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class LRUCache<T> implements com.navercorp.pinpoint.profiler.cache.Cache<


public LRUCache(int maxCacheSize) {
final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
final Caffeine<Object, Object> cacheBuilder = CaffeineBuilder.newBuilder();
cacheBuilder.initialCapacity(maxCacheSize);
cacheBuilder.maximumSize(maxCacheSize);
Cache<T, Object> localCache = cacheBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public SimpleCache(IdAllocator idAllocator, int cacheSize) {
}

private ConcurrentMap<T, Result<Integer>> createCache(int maxCacheSize) {
final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
final Caffeine<Object, Object> cacheBuilder = CaffeineBuilder.newBuilder();
cacheBuilder.initialCapacity(maxCacheSize);
cacheBuilder.maximumSize(maxCacheSize);
com.github.benmanes.caffeine.cache.Cache<T, Result<Integer>> localCache = cacheBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public UidCache(int cacheSize) {
}

private ConcurrentMap<String, Result<byte[]>> createCache(int maxCacheSize) {
final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
final Caffeine<Object, Object> cacheBuilder = CaffeineBuilder.newBuilder();
cacheBuilder.initialCapacity(maxCacheSize);
cacheBuilder.maximumSize(maxCacheSize);
com.github.benmanes.caffeine.cache.Cache<String, Result<byte[]>> localCache = cacheBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.navercorp.pinpoint.common.trace.BaseHistogramSchema;
import com.navercorp.pinpoint.common.trace.HistogramSchema;
import com.navercorp.pinpoint.common.trace.HistogramSlot;
import com.navercorp.pinpoint.profiler.cache.CaffeineBuilder;
import com.navercorp.pinpoint.profiler.context.id.LocalTraceRoot;
import com.navercorp.pinpoint.profiler.monitor.metric.response.ResponseTimeCollector;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -62,7 +63,7 @@ public DefaultActiveTraceRepository(ResponseTimeCollector responseTimeCollector,
}

private ConcurrentMap<ActiveTraceHandle, ActiveTrace> createCache(int maxActiveTraceSize) {
final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
final Caffeine<Object, Object> cacheBuilder = CaffeineBuilder.newBuilder();
cacheBuilder.initialCapacity(maxActiveTraceSize);
cacheBuilder.maximumSize(maxActiveTraceSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.navercorp.pinpoint.profiler.cache.CaffeineBuilder;

/**
* @author jaehong.kim
Expand All @@ -38,7 +39,7 @@ public DefaultHierarchyCaches(final int size, final int entrySize) {

this.cacheEntrySize = getCacheEntrySize(entrySize);

this.caches = Caffeine.newBuilder()
this.caches = CaffeineBuilder.newBuilder()
.maximumSize(this.cacheSize)
.initialCapacity(this.cacheSize)
.build(this::loadEntry);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.navercorp.pinpoint.profiler.cache;

import org.junit.jupiter.api.Test;

class CaffeineBuilderTest {


@Test
void executor_manager() {
CaffeineBuilder.newBuilder();
CaffeineBuilder.MANAGER.shutdown();
}

@Test
void executor_manager_null_check() {
CaffeineBuilder.newBuilder();
CaffeineBuilder.newBuilder();
CaffeineBuilder.MANAGER.shutdown();

CaffeineBuilder.newBuilder();
CaffeineBuilder.MANAGER.shutdown();
CaffeineBuilder.MANAGER.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.navercorp.pinpoint.profiler.cache;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ThreadPoolExecutor;

class ExecutorManagerTest {
@Test
void cpuCount() {
Assertions.assertEquals(1, ExecutorManager.cpuCount(0));

Assertions.assertEquals(1, ExecutorManager.cpuCount(1));
Assertions.assertEquals(1, ExecutorManager.cpuCount(2));
Assertions.assertEquals(2, ExecutorManager.cpuCount(3));
Assertions.assertEquals(3, ExecutorManager.cpuCount(4));

Assertions.assertEquals(4, ExecutorManager.cpuCount(5));
Assertions.assertEquals(4, ExecutorManager.cpuCount(16));
}


@Test
void cacheExecutor() {
ThreadPoolExecutor executor = (ThreadPoolExecutor) ExecutorManager.cacheExecutor();
executor.prestartAllCoreThreads();

executor.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package com.navercorp.pinpoint.profiler.cache;

import com.google.common.util.concurrent.Uninterruptibles;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -37,6 +39,11 @@ public void testPut() {
for (int i = 0; i < 1000; i++) {
cache.put(String.valueOf(random.nextInt(100000)));
}
if (cache.getSize() == cacheSize) {
return;
}
Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
cache.put("last");
Awaitility.await()
.untilAsserted(() -> assertThat(cache.getSize()).isEqualTo(cacheSize));
}
Expand Down

0 comments on commit aebbbc3

Please sign in to comment.