Skip to content

Commit

Permalink
Cleanup MarshlandPool
Browse files Browse the repository at this point in the history
  • Loading branch information
klaren committed Apr 12, 2018
1 parent 230b780 commit d2ec7d0
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 58 deletions.
Expand Up @@ -79,12 +79,12 @@ class TimeoutCheckStrategy implements CheckStrategy
private long lastCheckTime;
private final LongSupplier clock;

public TimeoutCheckStrategy( long interval )
TimeoutCheckStrategy( long interval )
{
this( interval, System::currentTimeMillis );
}

public TimeoutCheckStrategy( long interval, LongSupplier clock )
TimeoutCheckStrategy( long interval, LongSupplier clock )
{
this.interval = interval;
this.lastCheckTime = clock.getAsLong();
Expand All @@ -105,10 +105,10 @@ public boolean shouldCheck()
}
}

public static final int DEFAULT_CHECK_INTERVAL = 60 * 1000;
private static final int DEFAULT_CHECK_INTERVAL = 60 * 1000;

private final Queue<R> unused = new ConcurrentLinkedQueue<>();
private final Monitor monitor;
private final Monitor<R> monitor;
private final int minSize;
private final Factory<R> factory;
private final CheckStrategy checkStrategy;
Expand All @@ -121,10 +121,10 @@ public boolean shouldCheck()
public LinkedQueuePool( int minSize, Factory<R> factory )
{
this( minSize, factory, new CheckStrategy.TimeoutCheckStrategy( DEFAULT_CHECK_INTERVAL ),
new Monitor.Adapter() );
new Monitor.Adapter<>() );
}

public LinkedQueuePool( int minSize, Factory<R> factory, CheckStrategy strategy, Monitor monitor )
public LinkedQueuePool( int minSize, Factory<R> factory, CheckStrategy strategy, Monitor<R> monitor )
{
this.minSize = minSize;
this.factory = factory;
Expand Down
Expand Up @@ -48,50 +48,48 @@ public class MarshlandPool<T> implements Pool<T>
* - If none found, use the delegate pool.
*/

private final Pool<T> pool;
private final ThreadLocal<LocalSlot<T>> puddle = new ThreadLocal<LocalSlot<T>>()
{
@Override
protected LocalSlot<T> initialValue()
{
LocalSlot<T> localSlot = new LocalSlot<>( objectsFromDeadThreads );
slotReferences.add( localSlot.slotWeakReference );
return localSlot;
}
};
private final Pool<T> delegate;

// Used to reclaim objects from dead threads
private final Set<LocalSlotReference> slotReferences = newSetFromMap( new ConcurrentHashMap<>() );
private final Set<LocalSlotReference<T>> slotReferences = newSetFromMap( new ConcurrentHashMap<>() );
private final ReferenceQueue<LocalSlot<T>> objectsFromDeadThreads = new ReferenceQueue<>();

private final ThreadLocal<LocalSlot<T>> puddle = ThreadLocal.withInitial( () ->
{
LocalSlot<T> localSlot = new LocalSlot<>( objectsFromDeadThreads );
slotReferences.add( localSlot.slotWeakReference );
return localSlot;
} );

public MarshlandPool( Pool<T> delegatePool )
{
this.pool = delegatePool;
this.delegate = delegatePool;
}

@SuppressWarnings( "unchecked" )
@Override
public T acquire()
{
// Try and get it from the thread local
LocalSlot<T> localSlot = puddle.get();

T object = localSlot.object;
if ( object != null )
T obj = localSlot.localSlotObject;
if ( obj != null )
{
localSlot.set( null );
return object;
return obj;
}

// Try the reference queue, containing objects from dead threads
LocalSlotReference<T> slotReference = (LocalSlotReference) objectsFromDeadThreads.poll();
if ( slotReference != null && slotReference.object != null )
LocalSlotReference<T> slotReference = (LocalSlotReference<T>) objectsFromDeadThreads.poll();
if ( slotReference != null && slotReference.localSlotReferenceObject != null )
{
slotReferences.remove( slotReference );
return slotReference.object;
slotReferences.remove( slotReference ); // remove from old threads
return slotReference.localSlotReferenceObject;
}

// Fall back to the delegate pool
return pool.acquire();
return delegate.acquire();
}

@Override
Expand All @@ -100,79 +98,77 @@ public void release( T obj )
// Return it locally if possible
LocalSlot<T> localSlot = puddle.get();

if ( localSlot.object == null )
if ( localSlot.localSlotObject == null )
{
localSlot.set( obj );
}

// Fall back to the delegate pool
else
else // Fall back to the delegate pool
{
pool.release( obj );
delegate.release( obj );
}
}

/**
* Dispose of all objects in this pool, releasing them back to the delegate pool
*/
@SuppressWarnings( "unchecked" )
@Override
public void close()
{
for ( LocalSlotReference slotReference : slotReferences )
for ( LocalSlotReference<T> slotReference : slotReferences )
{
LocalSlot<T> slot = (LocalSlot) slotReference.get();
LocalSlot<T> slot = slotReference.get();
if ( slot != null )
{
T obj = slot.object;
T obj = slot.localSlotObject;
if ( obj != null )
{
slot.set( null );
pool.release( obj );
delegate.release( obj );
}
}
}

for ( LocalSlotReference<T> reference = (LocalSlotReference) objectsFromDeadThreads.poll(); reference != null;
reference = (LocalSlotReference) objectsFromDeadThreads.poll() )
for ( LocalSlotReference<T> reference; (reference = (LocalSlotReference<T>) objectsFromDeadThreads.poll()) != null; )
{
T instance = reference.object;
if ( instance != null )
T obj = reference.localSlotReferenceObject;
if ( obj != null )
{
pool.release( instance );
delegate.release( obj );
}
}
}

/**
* This is used to trigger the GC to notify us whenever the thread local has been garbage collected.
* Container for the "puddle", the small local pool each thread keeps.
*/
private static class LocalSlotReference<T> extends WeakReference<LocalSlot>
private static final class LocalSlot<T>
{
private T object;
private T localSlotObject;
private final LocalSlotReference<T> slotWeakReference;

private LocalSlotReference( LocalSlot referent, ReferenceQueue<? super LocalSlot> q )
private LocalSlot( ReferenceQueue<LocalSlot<T>> referenceQueue )
{
super( referent, q );
slotWeakReference = new LocalSlotReference<>( this, referenceQueue );
}

public void set( T obj )
{
slotWeakReference.localSlotReferenceObject = obj;
this.localSlotObject = obj;
}
}

/**
* Container for the "puddle", the small local pool each thread keeps.
* This is used to trigger the GC to notify us whenever the thread local has been garbage collected.
*/
private static class LocalSlot<T>
private static final class LocalSlotReference<T> extends WeakReference<LocalSlot<T>>
{
private T object;
private final LocalSlotReference slotWeakReference;
private T localSlotReferenceObject;

LocalSlot( ReferenceQueue<LocalSlot<T>> referenceQueue )
private LocalSlotReference( LocalSlot<T> referent, ReferenceQueue<? super LocalSlot<T>> q )
{
slotWeakReference = new LocalSlotReference( this, referenceQueue );
}

public void set( T obj )
{
slotWeakReference.object = obj;
this.object = obj;
super( referent, q );
}
}
}

0 comments on commit d2ec7d0

Please sign in to comment.