Skip to content

Commit

Permalink
Fixes memory leak on JDK 16+ (#806)
Browse files Browse the repository at this point in the history
In newer versions of java, specifically 16 and over, the `ThreadLocal` can
be cleared before the Thread is killed. This breaks the assumption of
`ThreadLocalHolder` which expected the `initializeThread` method to be
executed only once per `Thread`.

This commit changes the `threadValues` to become a map that holds which
thread the value is associated with so we can avoid recreating new
values when one already exists.

Fixes #722
  • Loading branch information
caesar-ralf committed May 18, 2022
1 parent 81f8493 commit f248f22
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 135 deletions.
163 changes: 92 additions & 71 deletions src/main/java/net/logstash/logback/util/ThreadLocalHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,31 @@

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;


/**
* Maintains a per-thread value created by the {@link Supplier} given to the constructor.
*
*
* <p>A thread obtains the value by calling {@link #acquire()} and must release it after
* use by calling {@link #release()}. If the value is not released, subsequent calls to
* {@link #acquire()} will throw an {@link IllegalStateException}.
*
*
* <p>Instances value may also implement the optional {@link ThreadLocalHolder.Lifecycle}
* interface if they wish to be notified when they are recycled or disposed.
*
*
* <p>The holder keeps track of each requesting thread and takes care of disposing the
* allocated value when it dies.
*
*
* All allocated values are automatically disposed when {@link ThreadLocalHolder#close()}
* is called.
*
*
* <p>Note: This class is for internal use only and subject to backward incompatible change
* at any time.
*
*
* @param <T> type of instances returned by this {@link ThreadLocalHolder}.
*
* @author brenuart
Expand All @@ -51,92 +52,92 @@ public class ThreadLocalHolder<T> {
* The factory used to create new instances
*/
private final Supplier<T> factory;

/**
* ThreadLocal holding per-thread instances
*/
private final ThreadLocal<Holder<T>> threadLocal = ThreadLocal.withInitial(this::initializeThread);

/**
* Collection of values assigned to each thread
*/
protected final CopyOnWriteArrayList<HolderRef> threadValues = new CopyOnWriteArrayList<>(); /* visible for testing */
protected final Map<Long, HolderRef> threadValues = new ConcurrentHashMap<>(); /* visible for testing */

/**
* Reference to dead threads
*/
private final ReferenceQueue<Thread> deadThreads = new ReferenceQueue<>();

/**
* {@code true} when the {@link ThreadLocalHolder} is closed.
* When closed, values released by threads will be immediately disposed and the reference cleared.
*/
private volatile boolean closed = false;


/**
* Create a new instance of the pool.
*
*
* @param factory the factory used to create new instances.
*/
public ThreadLocalHolder(Supplier<T> factory) {
this.factory = Objects.requireNonNull(factory);
}


/**
* Get the value assigned to the current thread, creating a new one if none is assigned yet or the
* previous has been disposed.
*
*
* The value must be {@link #release()} to ensure proper life cycle before it can be {@link #acquire()}
* again.
*
*
* @return the value assigned to this thread
* @throws IllegalStateException if the value is already in use and {@link #release()} was not yet invoked.
*/
public final T acquire() {
Holder<T> holder = this.threadLocal.get();

if (holder.leased) {
throw new IllegalStateException("ThreadLocal value is already in use and not yet released.");
}

if (holder.value == null) {
holder.value = Objects.requireNonNull(createInstance());
}

holder.leased = true;
return holder.value;
}


/**
* Release the value and recycle it if possible.
*
*
* @throws IllegalStateException if the value was not previously {@link #acquire()}.
*/
public final void release() {
Holder<T> holder = this.threadLocal.get();

if (!holder.leased) {
throw new IllegalStateException("Invalid attempt at releasing a value that was not previously acquired.");
}
holder.leased = false;

/*
* Dispose value if it cannot be recycled
*/
if (this.closed || !safelyRecycleInstance(holder.value)) {
disposeHolder(holder);
}

/*
* Dispose values assigned to threads that just died
*/
processDeadThreads();
}


/**
* Close the holder and dispose all values.
* Threads are still able to {@link #acquire()} values after the holder is closed, but they will be disposed
Expand All @@ -148,39 +149,53 @@ public void close() {
* immediately instead of being recycled.
*/
this.closed = true;

/*
* Dispose value assigned to running threads.
* "inuse" values will be disposed by the owning thread when it releases it.
*/
for (HolderRef holderRef: this.threadValues) {
for (HolderRef holderRef: this.threadValues.values()) {
Holder<T> holder = holderRef.getHolder();
if (!holder.leased) {
disposeHolder(holder);
}
}
this.threadValues.clear();

/*
* Dispose values assigned to threads that just died
*/
processDeadThreads();
}


/**
* Create a new {@link Holder} and keep track of the asking thread for clearing when the thread
* is gone.
*
*
* @return a {@link Holder} assigned to the current thread.
*/
private Holder<T> initializeThread() {
Holder<T> holder = new Holder<>();
threadValues.add(new HolderRef(Thread.currentThread(), holder, deadThreads));
return holder;
final Thread currentThread = Thread.currentThread();
final long threadId = currentThread.getId();
/* Since java 16+ we can't guarantee that `initializeThread` will be called only once per thread as pools
* like the `ForkJoinPool.commonPool()` will create innocuous workers which clean their thread locals before
* executing tasks submitted to it.
*
* Because of it, we changed the strategy of `threadValues` to become also a fallback for the values that
* we expect to be associated to the Thread, i.e., it relates the value to the thread id to what we expect
* to be in the `ThreadLocal` throughout its life.
*
* See https://github.com/logfellow/logstash-logback-encoder/issues/722#issuecomment-1107836944 for more
* information.
*/
return threadValues.computeIfAbsent(
threadId,
ignore -> new HolderRef(currentThread, new Holder<>(), deadThreads))
.holder;
}


/**
* Dispose values of dead threads
*/
Expand All @@ -193,46 +208,46 @@ private void processDeadThreads() {
while (ref != null) {
Holder<T> holder = ref.getHolder();
disposeHolder(holder);
threadValues.remove(ref);
threadValues.remove(ref.getThreadId());

ref = (HolderRef) deadThreads.poll();
}
}


private void disposeHolder(Holder<T> holder) {
safelyDisposeInstance(holder.value);
holder.value = null;
}


/**
* Create a new object instance (must be non-null).
* Sub-classes may override this method to implement their own custom logic if needed.
*
*
* @return a new object instance
*/
protected T createInstance() {
return this.factory.get();
}


/**
* Dispose the object instance by calling its life cycle methods.
* Sub-classes may override this method if they wish to implement their own custom logic.
*
*
* @param instance the instance to dispose
*/
protected void disposeInstance(T instance) {
if (instance instanceof Lifecycle) {
((Lifecycle) instance).dispose();
}
}


/**
* Safely dispose the given instance, ignoring any exception that may be thrown.
*
*
* @param instance the instance to dispose
*/
private void safelyDisposeInstance(T instance) {
Expand All @@ -242,12 +257,12 @@ private void safelyDisposeInstance(T instance) {
// ignore
}
}


/**
* Recycle the instance before returning it to the pool.
* Sub-classes may override this method if they wish to implement their own custom logic.
*
*
* @param instance the instance to recycle
* @return {@code true} if the instance can be recycled and returned to the pool, {@code false} if not.
*/
Expand All @@ -258,12 +273,12 @@ protected boolean recycleInstance(T instance) {
return true;
}
}


/**
* Safely call {@link ThreadLocalHolder#recycleInstance(Object)}, ignoring exceptions but returning
* {@code false} to prevent reuse if any is thrown.
*
*
* @param instance the instance to recycle
* @return {@code true} if the instance can be recycled, {@code false} otherwise.
*/
Expand All @@ -274,8 +289,8 @@ private boolean safelyRecycleInstance(T instance) {
return false;
}
}


/**
* Optional interface that pooled instances may implement if they wish to be notified of
* life cycle events.
Expand All @@ -284,26 +299,26 @@ public interface Lifecycle {
/**
* Indicate whether the instance can be recycled and returned to the pool and perform
* the necessary recycling tasks.
*
*
* @return {@code true} if the instance can be returned to the pool, {@code false} if
* it must be disposed instead.
*/
default boolean recycle() {
return true;
}

/**
* Dispose the instance and free allocated resources.
*/
default void dispose() {
// noop
}
}


/**
* Holds the value assigned to a thread together with its "inuse" state.
*
*
* This class is static as to not have a reference to the outer {@link ThreadLocalHolder}
* and prevent it from being garbage collected.
*/
Expand All @@ -313,31 +328,37 @@ private static class Holder<T> {
* May be null if none is already assigned or when the previous is disposed.
*/
private T value;

/**
* Indicate whether the instance is in use (acquired).
* Maintaining this flag helps to avoid recreating a new SoftReference every time
* the instance is released.
*/
private boolean leased;
}


/**
* A {@link WeakReference} to a thread with the {@link Holder} assigned to it.
* Used to detect the death of a thread and dispose the associated value.
*/
/* visible for testing */
protected class HolderRef extends WeakReference<Thread> {
private final Holder<T> holder;

private final long threadId;

HolderRef(Thread owningThread, Holder<T> holder, ReferenceQueue<Thread> referenceQueue) {
super(owningThread, referenceQueue);
this.threadId = owningThread.getId();
this.holder = holder;
}

public Holder<T> getHolder() {
return this.holder;
}

public long getThreadId() {
return this.threadId;
}
}
}

0 comments on commit f248f22

Please sign in to comment.