Skip to content

Commit

Permalink
Changes how MarshlandPool works with inner transactions
Browse files Browse the repository at this point in the history
so that the inner transaction won't take the main transaction's place
in the local slot. This has the effect of providing a more predictable
id sequence for a single thread creating entities, even when there may
be tokens created.
  • Loading branch information
tinwelint committed Sep 11, 2017
1 parent bd724c1 commit a966220
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 27 deletions.
Expand Up @@ -63,8 +63,8 @@ protected LocalSlot<T> initialValue()
};

// Used to reclaim objects from dead threads
private final Set<LocalSlotReference> slotReferences =
newSetFromMap( new ConcurrentHashMap<LocalSlotReference, Boolean>() );
private final Set<LocalSlotReference<T>> slotReferences =
newSetFromMap( new ConcurrentHashMap<LocalSlotReference<T>, Boolean>() );
private final ReferenceQueue<LocalSlot<T>> objectsFromDeadThreads = new ReferenceQueue<>();

public MarshlandPool( Factory<T> objectFactory )
Expand All @@ -84,22 +84,22 @@ public T acquire()
LocalSlot<T> localSlot = puddle.get();

T object = localSlot.object;
if ( object != null )
if ( object != null && localSlot.acquire() )
{
localSlot.set( null );
return object;
}

// Try the reference queue, containing objects from dead threads
LocalSlotReference<T> slotReference = (LocalSlotReference) objectsFromDeadThreads.poll();
@SuppressWarnings( "unchecked" )
LocalSlotReference<T> slotReference = (LocalSlotReference<T>) objectsFromDeadThreads.poll();
if ( slotReference != null && slotReference.object != null )
{
slotReferences.remove( slotReference );
return slotReference.object;
return localSlot.assignIfNotAssigned( slotReference.object );
}

// Fall back to the delegate pool
return pool.acquire();
return localSlot.assignIfNotAssigned( pool.acquire() );
}

@Override
Expand All @@ -108,40 +108,35 @@ public void release( T obj )
// Return it locally if possible
LocalSlot<T> localSlot = puddle.get();

if ( localSlot.object == null )
{
localSlot.set( obj );
}

// Fall back to the delegate pool
else
{
if ( !localSlot.release( obj ) )
{ // Fall back to the delegate pool
pool.release( obj );
}
// else it was released back into the slot
}

/**
* Dispose of all objects in this pool, releasing them back to the delegate pool
*/
@SuppressWarnings( "unchecked" )
public void disposeAll()
{
for ( LocalSlotReference slotReference : slotReferences )
for ( LocalSlotReference<T> slotReference : slotReferences )
{
LocalSlot<T> slot = (LocalSlot) slotReference.get();
LocalSlot<T> slot = slotReference.get();
if ( slot != null )
{
T obj = slot.object;
T obj = slot.clear();
if ( obj != null )
{
slot.set( null );
pool.release( obj );
}
}
}

for ( LocalSlotReference<T> reference = (LocalSlotReference) objectsFromDeadThreads.poll();
for ( LocalSlotReference<T> reference = (LocalSlotReference<T>) objectsFromDeadThreads.poll();
reference != null;
reference = (LocalSlotReference) objectsFromDeadThreads.poll() )
reference = (LocalSlotReference<T>) objectsFromDeadThreads.poll() )
{
T instance = reference.object;
if ( instance != null )
Expand All @@ -159,11 +154,11 @@ public void close()
/**
* This is used to trigger the GC to notify us whenever the thread local has been garbage collected.
*/
private static class LocalSlotReference<T> extends WeakReference<LocalSlot>
private static class LocalSlotReference<T> extends WeakReference<LocalSlot<T>>
{
private T object;

private LocalSlotReference( LocalSlot referent, ReferenceQueue<? super LocalSlot> q )
private LocalSlotReference( LocalSlot<T> referent, ReferenceQueue<? super LocalSlot<T>> q )
{
super( referent, q );
}
Expand All @@ -175,14 +170,82 @@ private LocalSlotReference( LocalSlot referent, ReferenceQueue<? super LocalSlot
private static class LocalSlot<T>
{
private T object;
private final LocalSlotReference phantomReference;
private final LocalSlotReference<T> phantomReference;
private boolean acquired;

LocalSlot( ReferenceQueue<LocalSlot<T>> referenceQueue )
{
phantomReference = new LocalSlotReference( this, referenceQueue );
phantomReference = new LocalSlotReference<>( this, referenceQueue );
}

T clear()
{
T result = acquired ? null : object;
set( null );
acquired = false;
return result;
}

/**
* Will assign {@code obj} to this slot if not already assigned.
* When calling this method this slot may be in different states:
* <ul>
* <li>object = null, acquired = false: initial assignment</li>
* <li>object != null, acquired = true: already assigned, but someone has it already</li>
* </ul>
*
* @param obj instance to assign
* @return the {@code obj} for convenience for the caller
*/
T assignIfNotAssigned( T obj )
{
if ( object == null )
{
boolean wasAcquired = acquire();
assert wasAcquired;
set( obj );
}
else
{
assert acquired;
}
return obj;
}

/**
* Marks this slot as not acquired anymore. This will only succeed if the released object matches the
* object in this slot.
*
* @param obj the object to release and to match with this slot object.
* @return whether or not {@code obj} matches the slot object.
*/
boolean release( T obj )
{
if ( obj == object )
{
assert acquired;
acquired = false;
return true;
}
return false;
}

/**
* Marks this slot as acquired. Object must have been assigned at this point.
*
* @return {@code true} if this slot wasn't acquired when calling this method, otherwise {@code false}.
*/
boolean acquire()
{
if ( acquired )
{
return false;
}
acquired = true;
return true;
}

public void set(T obj)
private void set( T obj )
{
phantomReference.object = obj;
this.object = obj;
Expand Down
Expand Up @@ -21,7 +21,9 @@

import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -54,7 +56,7 @@ public void shouldReturnToDelegatePoolIfLocalPoolIsFull() throws Exception
{
// Given
Pool<Object> delegatePool = mock(Pool.class);
when(delegatePool.acquire()).thenReturn( 1337 );
when(delegatePool.acquire()).thenReturn( 1337, 1338L, 1339L );

final MarshlandPool<Object> pool = new MarshlandPool<>(delegatePool);

Expand Down Expand Up @@ -94,6 +96,53 @@ public void shouldReleaseAllSlotsOnClose() throws Exception
verifyNoMoreInteractions( delegatePool );
}

/*
* This test is about how the LocalSlot works with nested use, i.e. that acquiring an instance from the local slot
* and while having it (before releasing it) acquiring a "nested" instance which gets released before releasing the
* first one. The test is about how that interacts with the delegate pool and that the nested instance should not
* take precedence over the first instance in the local slot.
*/
@Test
public void shouldHaveNestedUsageFallBackToDelegatePool() throws Exception
{
// given
Pool<Integer> delegatePool = mock( Pool.class );
when( delegatePool.acquire() ).thenReturn( 5, 6 );
MarshlandPool<Integer> pool = new MarshlandPool<>( delegatePool );

// when
Integer top = pool.acquire();
assertEquals( 5, top.intValue() );
verify( delegatePool, times( 1 ) ).acquire();

// do this a couple of times, just to verify that too
int hoops = 2;
for ( int i = 1; i <= hoops; i++ )
{
// and when w/o releasing the top one, acquire a nested
Integer nested = pool.acquire();
assertEquals( 6, nested.intValue() );
verify( delegatePool, times( 1 + i ) ).acquire();

// releasing the nested
pool.release( nested );
verify( delegatePool, times( i ) ).release( nested );
}

// when finally releasing the top one
verify( delegatePool, times( hoops ) ).release( anyInt() );
pool.release( top );
verify( delegatePool, times( hoops ) ).release( anyInt() );

// then the next acquire should see the top one
verify( delegatePool, times( 1 + hoops ) ).acquire();
Integer topAgain = pool.acquire();
verify( delegatePool, times( 1 + hoops ) ).acquire();
assertEquals( 5, topAgain.intValue() );
pool.release( topAgain );
verify( delegatePool, times( hoops ) ).release( anyInt() );
}

private void assertPoolEventuallyReturns( Pool<Object> pool, int expected ) throws InterruptedException
{
long maxTime = System.currentTimeMillis() + 1000 * 10;
Expand Down

0 comments on commit a966220

Please sign in to comment.