Skip to content

Commit

Permalink
Revert "Changes how MarshlandPool works with inner transactions"
Browse files Browse the repository at this point in the history
This reverts commit a966220.
  • Loading branch information
tinwelint committed Sep 18, 2017
1 parent 2b87fb4 commit fe603c7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 140 deletions.
Expand Up @@ -63,8 +63,8 @@ protected LocalSlot<T> initialValue()
};

// Used to reclaim objects from dead threads
private final Set<LocalSlotReference<T>> slotReferences =
newSetFromMap( new ConcurrentHashMap<LocalSlotReference<T>, Boolean>() );
private final Set<LocalSlotReference> slotReferences =
newSetFromMap( new ConcurrentHashMap<LocalSlotReference, Boolean>() );
private final ReferenceQueue<LocalSlot<T>> objectsFromDeadThreads = new ReferenceQueue<>();

public MarshlandPool( Factory<T> objectFactory )
Expand All @@ -84,22 +84,22 @@ public T acquire()
LocalSlot<T> localSlot = puddle.get();

T object = localSlot.object;
if ( object != null && localSlot.acquire() )
if ( object != null )
{
localSlot.set( null );
return object;
}

// Try the reference queue, containing objects from dead threads
@SuppressWarnings( "unchecked" )
LocalSlotReference<T> slotReference = (LocalSlotReference<T>) objectsFromDeadThreads.poll();
LocalSlotReference<T> slotReference = (LocalSlotReference) objectsFromDeadThreads.poll();
if ( slotReference != null && slotReference.object != null )
{
slotReferences.remove( slotReference );
return localSlot.assignIfNotAssigned( slotReference.object );
return slotReference.object;
}

// Fall back to the delegate pool
return localSlot.assignIfNotAssigned( pool.acquire() );
return pool.acquire();
}

@Override
Expand All @@ -108,35 +108,40 @@ public void release( T obj )
// Return it locally if possible
LocalSlot<T> localSlot = puddle.get();

if ( !localSlot.release( obj ) )
{ // Fall back to the delegate pool
if ( localSlot.object == null )
{
localSlot.set( obj );
}

// Fall back to the delegate pool
else
{
pool.release( obj );
}
// else it was released back into the slot
}

/**
* Dispose of all objects in this pool, releasing them back to the delegate pool
*/
@SuppressWarnings( "unchecked" )
public void disposeAll()
{
for ( LocalSlotReference<T> slotReference : slotReferences )
for ( LocalSlotReference slotReference : slotReferences )
{
LocalSlot<T> slot = slotReference.get();
LocalSlot<T> slot = (LocalSlot) slotReference.get();
if ( slot != null )
{
T obj = slot.clear();
T obj = slot.object;
if ( obj != null )
{
slot.set( null );
pool.release( obj );
}
}
}

for ( LocalSlotReference<T> reference = (LocalSlotReference<T>) objectsFromDeadThreads.poll();
for ( LocalSlotReference<T> reference = (LocalSlotReference) objectsFromDeadThreads.poll();
reference != null;
reference = (LocalSlotReference<T>) objectsFromDeadThreads.poll() )
reference = (LocalSlotReference) objectsFromDeadThreads.poll() )
{
T instance = reference.object;
if ( instance != null )
Expand All @@ -154,11 +159,11 @@ public void close()
/**
* This is used to trigger the GC to notify us whenever the thread local has been garbage collected.
*/
private static class LocalSlotReference<T> extends WeakReference<LocalSlot<T>>
private static class LocalSlotReference<T> extends WeakReference<LocalSlot>
{
private T object;

private LocalSlotReference( LocalSlot<T> referent, ReferenceQueue<? super LocalSlot<T>> q )
private LocalSlotReference( LocalSlot referent, ReferenceQueue<? super LocalSlot> q )
{
super( referent, q );
}
Expand All @@ -170,82 +175,14 @@ private LocalSlotReference( LocalSlot<T> referent, ReferenceQueue<? super LocalS
private static class LocalSlot<T>
{
private T object;
private final LocalSlotReference<T> phantomReference;
private boolean acquired;
private final LocalSlotReference phantomReference;

LocalSlot( ReferenceQueue<LocalSlot<T>> referenceQueue )
{
phantomReference = new LocalSlotReference<>( this, referenceQueue );
}

T clear()
{
T result = acquired ? null : object;
set( null );
acquired = false;
return result;
}

/**
* Will assign {@code obj} to this slot if not already assigned.
* When calling this method this slot may be in different states:
* <ul>
* <li>object = null, acquired = false: initial assignment</li>
* <li>object != null, acquired = true: already assigned, but someone has it already</li>
* </ul>
*
* @param obj instance to assign
* @return the {@code obj} for convenience for the caller
*/
T assignIfNotAssigned( T obj )
{
if ( object == null )
{
boolean wasAcquired = acquire();
assert wasAcquired;
set( obj );
}
else
{
assert acquired;
}
return obj;
}

/**
* Marks this slot as not acquired anymore. This will only succeed if the released object matches the
* object in this slot.
*
* @param obj the object to release and to match with this slot object.
* @return whether or not {@code obj} matches the slot object.
*/
boolean release( T obj )
{
if ( obj == object )
{
assert acquired;
acquired = false;
return true;
}
return false;
}

/**
* Marks this slot as acquired. Object must have been assigned at this point.
*
* @return {@code true} if this slot wasn't acquired when calling this method, otherwise {@code false}.
*/
boolean acquire()
{
if ( acquired )
{
return false;
}
acquired = true;
return true;
phantomReference = new LocalSlotReference( this, referenceQueue );
}

private void set( T obj )
public void set( T obj )
{
phantomReference.object = obj;
this.object = obj;
Expand Down
Expand Up @@ -23,10 +23,8 @@

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -58,7 +56,7 @@ public void shouldReturnToDelegatePoolIfLocalPoolIsFull() throws Exception
{
// Given
Pool<Object> delegatePool = mock( Pool.class );
when( delegatePool.acquire() ).thenReturn( 1337, 1338L, 1339L );
when(delegatePool.acquire()).thenReturn( 1337 );

final MarshlandPool<Object> pool = new MarshlandPool<>( delegatePool );

Expand Down Expand Up @@ -98,53 +96,6 @@ public void shouldReleaseAllSlotsOnClose() throws Exception
verifyNoMoreInteractions( delegatePool );
}

/*
* This test is about how the LocalSlot works with nested use, i.e. that acquiring an instance from the local slot
* and while having it (before releasing it) acquiring a "nested" instance which gets released before releasing the
* first one. The test is about how that interacts with the delegate pool and that the nested instance should not
* take precedence over the first instance in the local slot.
*/
@Test
public void shouldHaveNestedUsageFallBackToDelegatePool() throws Exception
{
// given
Pool<Integer> delegatePool = mock( Pool.class );
when( delegatePool.acquire() ).thenReturn( 5, 6 );
MarshlandPool<Integer> pool = new MarshlandPool<>( delegatePool );

// when
Integer top = pool.acquire();
assertEquals( 5, top.intValue() );
verify( delegatePool, times( 1 ) ).acquire();

// do this a couple of times, just to verify that too
int hoops = 2;
for ( int i = 1; i <= hoops; i++ )
{
// and when w/o releasing the top one, acquire a nested
Integer nested = pool.acquire();
assertEquals( 6, nested.intValue() );
verify( delegatePool, times( 1 + i ) ).acquire();

// releasing the nested
pool.release( nested );
verify( delegatePool, times( i ) ).release( nested );
}

// when finally releasing the top one
verify( delegatePool, times( hoops ) ).release( anyInt() );
pool.release( top );
verify( delegatePool, times( hoops ) ).release( anyInt() );

// then the next acquire should see the top one
verify( delegatePool, times( 1 + hoops ) ).acquire();
Integer topAgain = pool.acquire();
verify( delegatePool, times( 1 + hoops ) ).acquire();
assertEquals( 5, topAgain.intValue() );
pool.release( topAgain );
verify( delegatePool, times( hoops ) ).release( anyInt() );
}

private void assertPoolEventuallyReturns( Pool<Object> pool, int expected ) throws InterruptedException
{
long maxTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis( 10 );
Expand Down

0 comments on commit fe603c7

Please sign in to comment.