Skip to content

Commit

Permalink
Clean WeakConcurrentMap from background thread (#6240)
Browse files Browse the repository at this point in the history
Currently our `WeakConcurrentMap` is only cleaned of stale entries when
it is accessed. There is an option to clean from a background thread,
but this creates a separate thread for every map. This pr introduces a
single background thread that cleans all maps.
I removed the option to create a thread per map as we don't use it, if
there is interest I could attempt to find a way to add it back.

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
  • Loading branch information
laurit and trask committed Nov 23, 2022
1 parent 85b3644 commit 52cfafc
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@
*/
// Suppress warnings since this is vendored as-is.
@SuppressWarnings({"MissingSummary", "EqualsBrokenForNull", "FieldMissingNullable"})
abstract class AbstractWeakConcurrentMap<K, V, L> extends ReferenceQueue<K>
implements Runnable, Iterable<Map.Entry<K, V>> {
abstract class AbstractWeakConcurrentMap<K, V, L> implements Iterable<Map.Entry<K, V>> {

private static final ReferenceQueue<Object> REFERENCE_QUEUE = new ReferenceQueue<>();

final ConcurrentMap<WeakKey<K>, V> target;
private final WeakReference<ConcurrentMap<WeakKey<K>, ?>> weakTarget;

protected AbstractWeakConcurrentMap() {
this(new ConcurrentHashMap<>());
Expand All @@ -62,6 +64,7 @@ protected AbstractWeakConcurrentMap() {
*/
protected AbstractWeakConcurrentMap(ConcurrentMap<WeakKey<K>, V> target) {
this.target = target;
this.weakTarget = new WeakReference<>(target);
}

/**
Expand Down Expand Up @@ -92,7 +95,7 @@ public V get(K key) {
if (value == null) {
value = defaultValue(key);
if (value != null) {
V previousValue = target.putIfAbsent(new WeakKey<>(key, this), value);
V previousValue = target.putIfAbsent(new WeakKey<>(key, weakTarget), value);
if (previousValue != null) {
value = previousValue;
}
Expand Down Expand Up @@ -142,7 +145,7 @@ public V put(K key, V value) {
if (key == null || value == null) {
throw new NullPointerException();
}
return target.put(new WeakKey<>(key, this), value);
return target.put(new WeakKey<>(key, weakTarget), value);
}

/**
Expand All @@ -161,7 +164,7 @@ public V putIfAbsent(K key, V value) {
} finally {
resetLookupKey(lookupKey);
}
return previous == null ? target.putIfAbsent(new WeakKey<>(key, this), value) : previous;
return previous == null ? target.putIfAbsent(new WeakKey<>(key, weakTarget), value) : previous;
}

public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
Expand All @@ -176,7 +179,8 @@ public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction
resetLookupKey(lookupKey);
}
return previous == null
? target.computeIfAbsent(new WeakKey<>(key, this), ignored -> mappingFunction.apply(key))
? target.computeIfAbsent(
new WeakKey<>(key, weakTarget), ignored -> mappingFunction.apply(key))
: previous;
}

Expand All @@ -189,7 +193,7 @@ public V putIfProbablyAbsent(K key, V value) {
if (key == null || value == null) {
throw new NullPointerException();
}
return target.putIfAbsent(new WeakKey<>(key, this), value);
return target.putIfAbsent(new WeakKey<>(key, weakTarget), value);
}

/**
Expand Down Expand Up @@ -226,10 +230,17 @@ protected V defaultValue(K key) {
}

/** Cleans all unused references. */
public void expungeStaleEntries() {
public static void expungeStaleEntries() {
Reference<?> reference;
while ((reference = poll()) != null) {
target.remove(reference);
while ((reference = REFERENCE_QUEUE.poll()) != null) {
removeWeakKey((WeakKey<?>) reference);
}
}

private static void removeWeakKey(WeakKey<?> weakKey) {
ConcurrentMap<?, ?> map = weakKey.ownerRef.get();
if (map != null) {
map.remove(weakKey);
}
}

Expand All @@ -243,11 +254,11 @@ public int approximateSize() {
return target.size();
}

@Override
public void run() {
static void runCleanup() {
try {
while (!Thread.interrupted()) {
target.remove(remove());
Reference<?> reference = REFERENCE_QUEUE.remove();
removeWeakKey((WeakKey<?>) reference);
}
} catch (InterruptedException ignored) {
// do nothing
Expand Down Expand Up @@ -301,10 +312,12 @@ public String toString() {
static final class WeakKey<K> extends WeakReference<K> {

private final int hashCode;
private final WeakReference<ConcurrentMap<WeakKey<K>, ?>> ownerRef;

WeakKey(K key, ReferenceQueue<? super K> queue) {
super(key, queue);
WeakKey(K key, WeakReference<ConcurrentMap<WeakKey<K>, ?>> ownerRef) {
super(key, REFERENCE_QUEUE);
hashCode = System.identityHashCode(key);
this.ownerRef = ownerRef;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import javax.annotation.Nullable;

Expand All @@ -50,7 +49,6 @@
"HashCodeToString",
"MissingSummary",
"UngroupedOverloads",
"ThreadPriorityCheck",
"FieldMissingNullable"
})
public class WeakConcurrentMap<K, V>
Expand All @@ -69,17 +67,10 @@ protected LookupKey<?> initialValue() {
}
};

private static final AtomicLong ID = new AtomicLong();

private final Thread thread;

private final boolean reuseKeys;

/**
* @param cleanerThread {@code true} if a thread should be started that removes stale entries.
*/
public WeakConcurrentMap(boolean cleanerThread) {
this(cleanerThread, isPersistentClassLoader(LookupKey.class.getClassLoader()));
public WeakConcurrentMap() {
this(isPersistentClassLoader(LookupKey.class.getClassLoader()));
}

/**
Expand All @@ -105,37 +96,24 @@ private static boolean isPersistentClassLoader(ClassLoader classLoader) {
}

/**
* @param cleanerThread {@code true} if a thread should be started that removes stale entries.
* @param reuseKeys {@code true} if the lookup keys should be reused via a {@link ThreadLocal}.
* Note that setting this to {@code true} may result in class loader leaks. See {@link
* #isPersistentClassLoader(ClassLoader)} for more details.
*/
public WeakConcurrentMap(boolean cleanerThread, boolean reuseKeys) {
this(cleanerThread, reuseKeys, new ConcurrentHashMap<>());
public WeakConcurrentMap(boolean reuseKeys) {
this(reuseKeys, new ConcurrentHashMap<>());
}

/**
* @param cleanerThread {@code true} if a thread should be started that removes stale entries.
* @param reuseKeys {@code true} if the lookup keys should be reused via a {@link ThreadLocal}.
* Note that setting this to {@code true} may result in class loader leaks. See {@link
* #isPersistentClassLoader(ClassLoader)} for more details.
* @param target ConcurrentMap implementation that this class wraps.
*/
public WeakConcurrentMap(
boolean cleanerThread,
boolean reuseKeys,
ConcurrentMap<AbstractWeakConcurrentMap.WeakKey<K>, V> target) {
boolean reuseKeys, ConcurrentMap<AbstractWeakConcurrentMap.WeakKey<K>, V> target) {
super(target);
this.reuseKeys = reuseKeys;
if (cleanerThread) {
thread = new Thread(this);
thread.setName("weak-ref-cleaner-" + ID.getAndIncrement());
thread.setPriority(Thread.MIN_PRIORITY);
thread.setDaemon(true);
thread.start();
} else {
thread = null;
}
}

@Override
Expand All @@ -155,13 +133,6 @@ protected void resetLookupKey(LookupKey<K> lookupKey) {
lookupKey.reset();
}

/**
* @return The cleaner thread or {@code null} if no such thread was set.
*/
public Thread getCleanerThread() {
return thread;
}

/*
* A lookup key must only be used for looking up instances within a map. For this to work, it implements an identical contract for
* hash code and equals as the WeakKey implementation. At the same time, the lookup key implementation does not extend WeakReference
Expand Down Expand Up @@ -211,10 +182,6 @@ public int hashCode() {
*/
public static class WithInlinedExpunction<K, V> extends WeakConcurrentMap<K, V> {

public WithInlinedExpunction() {
super(false);
}

@Override
public V get(K key) {
expungeStaleEntries();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.internal.cache.weaklockfree;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class WeakConcurrentMapCleaner {
private static Thread thread;

private WeakConcurrentMapCleaner() {}

public static synchronized void start() {
if (thread != null) {
return;
}

thread = new Thread(AbstractWeakConcurrentMap::runCleanup, "weak-ref-cleaner");
thread.setDaemon(true);
thread.setContextClassLoader(null);
thread.start();
}

@SuppressWarnings("Interruption")
public static synchronized void stop() {
if (thread == null) {
return;
}

thread.interrupt();
thread = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.javaagent.tooling;

import io.opentelemetry.instrumentation.api.internal.cache.weaklockfree.WeakConcurrentMapCleaner;
import io.opentelemetry.javaagent.bootstrap.AgentInitializer;
import io.opentelemetry.javaagent.bootstrap.AgentStarter;
import java.io.File;
Expand Down Expand Up @@ -84,6 +85,7 @@ private void internalStart() {
try {
loggingCustomizer.init();
AgentInstaller.installBytebuddyAgent(instrumentation);
WeakConcurrentMapCleaner.start();
} catch (Throwable t) {
// this is logged below and not rethrown to avoid logging it twice
startupError = t;
Expand Down

0 comments on commit 52cfafc

Please sign in to comment.