Skip to content

Commit

Permalink
[Test Failure] ResourcePoolTest is flaky in 2.3 on Java 8.
Browse files Browse the repository at this point in the history
Update handling of lastCheckTime ResourcePool.TimeoutCheckStrategy
Cleanup of ResourcePoolTest.
  • Loading branch information
MishaDemianenko committed Aug 11, 2015
1 parent fbe1255 commit e8b1118
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 102 deletions.
18 changes: 9 additions & 9 deletions enterprise/com/src/main/java/org/neo4j/com/ResourcePool.java
Expand Up @@ -32,17 +32,17 @@ public abstract class ResourcePool<R>
{ {
public interface Monitor<R> public interface Monitor<R>
{ {
public void updatedCurrentPeakSize( int currentPeakSize ); void updatedCurrentPeakSize( int currentPeakSize );


public void updatedTargetSize( int targetSize ); void updatedTargetSize( int targetSize );


public void created( R resource ); void created( R resource );


public void acquired( R resource ); void acquired( R resource );


public void disposed( R resource ); void disposed( R resource );


public class Adapter<R> implements Monitor<R> class Adapter<R> implements Monitor<R>
{ {
@Override @Override
public void updatedCurrentPeakSize( int currentPeakSize ) public void updatedCurrentPeakSize( int currentPeakSize )
Expand Down Expand Up @@ -73,12 +73,12 @@ public void disposed( R resource )


public interface CheckStrategy public interface CheckStrategy
{ {
public boolean shouldCheck(); boolean shouldCheck();


public class TimeoutCheckStrategy implements CheckStrategy class TimeoutCheckStrategy implements CheckStrategy
{ {
private final long interval; private final long interval;
private long lastCheckTime; private volatile long lastCheckTime;
private final Clock clock; private final Clock clock;


public TimeoutCheckStrategy( long interval, Clock clock ) public TimeoutCheckStrategy( long interval, Clock clock )
Expand Down
194 changes: 101 additions & 93 deletions enterprise/com/src/test/java/org/neo4j/com/ResourcePoolTest.java
Expand Up @@ -36,6 +36,10 @@


public class ResourcePoolTest public class ResourcePoolTest
{ {

private static final int TIMEOUT_MILLIS = 100;
private static final int TIMEOUT_EXCEED_MILLIS = TIMEOUT_MILLIS + 10;

@Test @Test
public void shouldNotReuseBrokenInstances() throws Exception public void shouldNotReuseBrokenInstances() throws Exception
{ {
Expand Down Expand Up @@ -74,9 +78,9 @@ public void shouldTimeoutGracefully() throws InterruptedException
{ {
FakeClock clock = new FakeClock(); FakeClock clock = new FakeClock();


ResourcePool.CheckStrategy timeStrategy = new ResourcePool.CheckStrategy.TimeoutCheckStrategy( 100, clock ); ResourcePool.CheckStrategy timeStrategy = new ResourcePool.CheckStrategy.TimeoutCheckStrategy( TIMEOUT_MILLIS, clock );


while ( clock.currentTimeMillis() <= 100 ) while ( clock.currentTimeMillis() <= TIMEOUT_MILLIS )
{ {
assertFalse( timeStrategy.shouldCheck() ); assertFalse( timeStrategy.shouldCheck() );
clock.forward( 10, TimeUnit.MILLISECONDS ); clock.forward( 10, TimeUnit.MILLISECONDS );
Expand Down Expand Up @@ -127,22 +131,22 @@ public void shouldBuildUpGracefullyWhilePassingMinPoolSizeBeforeTimerRings() thr
public void shouldUpdateTargetSizeWhenSpikesOccur() throws Exception public void shouldUpdateTargetSizeWhenSpikesOccur() throws Exception
{ {
// given // given
final int MIN_SIZE = 5; final int poolMinSize = 5;
final int MAX_SIZE = 10; final int poolMaxSize = 10;


StatefulMonitor stateMonitor = new StatefulMonitor(); StatefulMonitor stateMonitor = new StatefulMonitor();
FakeClock clock = new FakeClock(); FakeClock clock = new FakeClock();
final ResourcePool<Something> pool = getResourcePool( stateMonitor, clock, MIN_SIZE ); final ResourcePool<Something> pool = getResourcePool( stateMonitor, clock, poolMinSize );


// when // when
List<ResourceHolder> holders = acquireFromPool( pool, MAX_SIZE ); List<ResourceHolder> holders = acquireFromPool( pool, poolMaxSize );
clock.forward( 110, TimeUnit.MILLISECONDS ); exceedTimeout( clock );
holders.addAll( acquireFromPool( pool, 1 ) ); // Needed to trigger the alarm holders.addAll( acquireFromPool( pool, 1 ) ); // Needed to trigger the alarm


// then // then
assertEquals( MAX_SIZE + 1, stateMonitor.currentPeakSize.get() ); assertEquals( poolMaxSize + 1, stateMonitor.currentPeakSize.get() );
// We have not released anything, so targetSize will not be reduced // We have not released anything, so targetSize will not be reduced
assertEquals( MAX_SIZE + 1, stateMonitor.targetSize.get() ); // + 1 from the acquire assertEquals( poolMaxSize + 1, stateMonitor.targetSize.get() ); // + 1 from the acquire


for ( ResourceHolder holder : holders ) for ( ResourceHolder holder : holders )
{ {
Expand All @@ -154,11 +158,11 @@ public void shouldUpdateTargetSizeWhenSpikesOccur() throws Exception
public void shouldKeepSmallPeakAndNeverDisposeIfAcquireAndReleaseContinuously() throws Exception public void shouldKeepSmallPeakAndNeverDisposeIfAcquireAndReleaseContinuously() throws Exception
{ {
// given // given
final int MIN_SIZE = 1; final int poolMinSize = 1;


StatefulMonitor stateMonitor = new StatefulMonitor(); StatefulMonitor stateMonitor = new StatefulMonitor();
FakeClock clock = new FakeClock(); FakeClock clock = new FakeClock();
final ResourcePool<Something> pool = getResourcePool( stateMonitor, clock, MIN_SIZE ); final ResourcePool<Something> pool = getResourcePool( stateMonitor, clock, poolMinSize );


// when // when
for ( int i = 0; i < 200; i++ ) for ( int i = 0; i < 200; i++ )
Expand All @@ -182,141 +186,139 @@ public void shouldKeepSmallPeakAndNeverDisposeIfAcquireAndReleaseContinuously()
public void shouldSlowlyReduceTheNumberOfResourcesInThePoolWhenResourcesAreReleased() throws Exception public void shouldSlowlyReduceTheNumberOfResourcesInThePoolWhenResourcesAreReleased() throws Exception
{ {
// given // given
final int MIN_SIZE = 50; final int poolMinSize = 50;
final int MAX_SIZE = 200; final int poolMaxSize = 200;


StatefulMonitor stateMonitor = new StatefulMonitor(); StatefulMonitor stateMonitor = new StatefulMonitor();
FakeClock clock = new FakeClock(); FakeClock clock = new FakeClock();
final ResourcePool<Something> pool = getResourcePool( stateMonitor, clock, MIN_SIZE ); final ResourcePool<Something> pool = getResourcePool( stateMonitor, clock, poolMinSize );
List<ResourceHolder> holders = new LinkedList<ResourceHolder>();


buildAPeakOfAcquiredResourcesAndTriggerAlarmWithSideEffects( MAX_SIZE, clock, pool, holders ); acquireResourcesAndExceedTimeout( pool, clock, poolMaxSize );


// when // when
// After the peak, stay below MIN_SIZE concurrent usage, using up all already present resources. // After the peak, stay below MIN_SIZE concurrent usage, using up all already present resources.
clock.forward( 110, TimeUnit.MILLISECONDS ); exceedTimeout( clock );
for ( int i = 0; i < MAX_SIZE; i++ ) for ( int i = 0; i < poolMaxSize; i++ )
{ {
acquireFromPool( pool, 1 ).get( 0 ).release(); acquireFromPool( pool, 1 ).get( 0 ).release();
} }


// then // then

// currentPeakSize must have reset from the latest check to minimum size.
// currentPeakSize must have reset from the latest alarm to MIN_SIZE. assertEquals( 1, stateMonitor.currentPeakSize.get() ); // because of timeout
assertEquals( 1, stateMonitor.currentPeakSize.get() ); // Alarm // targetSize must be set to MIN_SIZE since currentPeakSize was that 2 checks ago and didn't increase
// targetSize must be set to MIN_SIZE since currentPeakSize was that 2 alarms ago and didn't increase assertEquals( poolMinSize, stateMonitor.targetSize.get() );
assertEquals( MIN_SIZE, stateMonitor.targetSize.get() );
// Only pooled resources must be used, disposing what is in excess // Only pooled resources must be used, disposing what is in excess
// +1 for the alarm from buildAPeakOfAcquiredResourcesAndTriggerAlarmWithSideEffects // +1 that was used to trigger exceed timeout check
assertEquals( MAX_SIZE - MIN_SIZE + 1, stateMonitor.disposed.get() ); assertEquals( poolMaxSize - poolMinSize + 1, stateMonitor.disposed.get() );
} }


@Test @Test
public void shouldMaintainPoolAtHighWatermarkWhenConcurrentUsagePassesMinSize() throws Exception public void shouldMaintainPoolHigherThenMinSizeWhenPeekUsagePasses() throws Exception
{ {
// given // given
final int MIN_SIZE = 50; final int poolMinSize = 50;
final int MAX_SIZE = 200; final int poolMaxSize = 200;
final int MID_SIZE = 90; final int afterPeekPoolSize = 90;


StatefulMonitor stateMonitor = new StatefulMonitor(); StatefulMonitor stateMonitor = new StatefulMonitor();
FakeClock clock = new FakeClock(); FakeClock clock = new FakeClock();
final ResourcePool<Something> pool = getResourcePool( stateMonitor, clock, MIN_SIZE ); final ResourcePool<Something> pool = getResourcePool( stateMonitor, clock, poolMinSize );
List<ResourceHolder> holders = new LinkedList<ResourceHolder>();


buildAPeakOfAcquiredResourcesAndTriggerAlarmWithSideEffects( MAX_SIZE, clock, pool, holders ); acquireResourcesAndExceedTimeout( pool, clock, poolMaxSize );


// when // when
// After the peak, stay at MID_SIZE concurrent usage, using up all already present resources in the process // After the peak, stay at afterPeekPoolSize concurrent usage, using up all already present resources in the process
// but also keeping the high watermark above the MIN_SIZE // but also keeping the high watermark above the minimum size
clock.forward( 110, TimeUnit.MILLISECONDS ); exceedTimeout( clock );
// Requires some rounds to happen, since there is constant racing between releasing and acquiring which does // Requires some rounds to happen, since there is constant racing between releasing and acquiring which does
// not always result in reaping of resources, as there is reuse // not always result in reaping of resources, as there is reuse
for ( int i = 0; i < 10; i++ ) for ( int i = 0; i < 10; i++ )
{ {
// The latch is necessary to reduce races between batches // The latch is necessary to reduce races between batches
CountDownLatch release = new CountDownLatch( MID_SIZE ); CountDownLatch release = new CountDownLatch( afterPeekPoolSize );
for ( ResourceHolder holder : acquireFromPool( pool, MID_SIZE ) ) for ( ResourceHolder holder : acquireFromPool( pool, afterPeekPoolSize ) )
{ {
holder.release( release ); holder.release( release );
} }
release.await(); release.await();
clock.forward( 110, TimeUnit.MILLISECONDS ); exceedTimeout( clock );
} }


// then // then
// currentPeakSize should be at MID_SIZE // currentPeakSize should be at afterPeekPoolSize
assertEquals( MID_SIZE, stateMonitor.currentPeakSize.get() ); assertEquals( afterPeekPoolSize, stateMonitor.currentPeakSize.get() );
// target size too // target size too
assertEquals( MID_SIZE, stateMonitor.targetSize.get() ); assertEquals( afterPeekPoolSize, stateMonitor.targetSize.get() );
// only the excess from the MAX_SIZE down to mid size must have been disposed // only the excess from the maximum size down to after peek usage size must have been disposed
// +1 for the alarm from buildAPeakOfAcquiredResourcesAndTriggerAlarmWithSideEffects // +1 that was used to trigger exceed timeout check
assertEquals( MAX_SIZE - MID_SIZE + 1, stateMonitor.disposed.get() ); assertEquals( poolMaxSize - afterPeekPoolSize + 1, stateMonitor.disposed.get() );
} }


@Test @Test
public void shouldReclaimAndRecreateWhenLullBetweenSpikesOccurs() throws Exception public void shouldReclaimAndRecreateWhenUsageGoesDownBetweenSpikes() throws Exception
{ {
// given // given
final int MIN_SIZE = 50; final int poolMinSize = 50;
final int BELOW_MIN_SIZE = MIN_SIZE / 5; final int bellowPoolMinSize = poolMinSize / 5;
final int MAX_SIZE = 200; final int poolMaxSize = 200;


StatefulMonitor stateMonitor = new StatefulMonitor(); StatefulMonitor stateMonitor = new StatefulMonitor();
FakeClock clock = new FakeClock(); FakeClock clock = new FakeClock();
final ResourcePool<Something> pool = getResourcePool( stateMonitor, clock, MIN_SIZE ); final ResourcePool<Something> pool = getResourcePool( stateMonitor, clock, poolMinSize );
List<ResourceHolder> holders = new LinkedList<ResourceHolder>();


buildAPeakOfAcquiredResourcesAndTriggerAlarmWithSideEffects( MAX_SIZE, clock, pool, holders ); acquireResourcesAndExceedTimeout( pool, clock, poolMaxSize );


// when // when
// After the peak, stay well below concurrent usage, using up all already present resources in the process // After the peak, stay well below concurrent usage, using up all already present resources in the process
clock.forward( 110, TimeUnit.MILLISECONDS ); exceedTimeout( clock );
// Requires some rounds to happen, since there is constant racing between releasing and acquiring which does // Requires some rounds to happen, since there is constant racing between releasing and acquiring which does
// not always result in reaping of resources, as there is reuse // not always result in reaping of resources, as there is reuse
for ( int i = 0; i < 30; i++ ) for ( int i = 0; i < 30; i++ )
{ {
// The latch is necessary to reduce races between batches // The latch is necessary to reduce races between batches
CountDownLatch release = new CountDownLatch( BELOW_MIN_SIZE ); CountDownLatch release = new CountDownLatch( bellowPoolMinSize );
for ( ResourceHolder holder : acquireFromPool( pool, BELOW_MIN_SIZE ) ) for ( ResourceHolder holder : acquireFromPool( pool, bellowPoolMinSize ) )
{ {
holder.release( release ); holder.release( release );
} }
release.await(); release.await();
clock.forward( 110, TimeUnit.MILLISECONDS ); exceedTimeout( clock );
} }


// then // then
// currentPeakSize should be at MIN_SIZE / 5 // currentPeakSize should be at bellowPoolMinSize
assertEquals( BELOW_MIN_SIZE, stateMonitor.currentPeakSize.get() ); assertEquals( bellowPoolMinSize, stateMonitor.currentPeakSize.get() );
// target size should remain at MIN_SIZE // target size should remain at pool min size
assertEquals( MIN_SIZE, stateMonitor.targetSize.get() ); assertEquals( poolMinSize, stateMonitor.targetSize.get() );
// only the excess from the MAX_SIZE down to min size must have been disposed // only the excess from the pool max size down to min size must have been disposed
// +1 for the alarm from buildAPeakOfAcquiredResourcesAndTriggerAlarmWithSideEffects // +1 that was used to trigger initial exceed timeout check
assertEquals( MAX_SIZE - MIN_SIZE + 1, stateMonitor.disposed.get() ); assertEquals( poolMaxSize - poolMinSize + 1, stateMonitor.disposed.get() );


stateMonitor.created.set( 0 ); stateMonitor.created.set( 0 );
stateMonitor.disposed.set( 0 ); stateMonitor.disposed.set( 0 );


// when // when
// After the lull, recreate a peak // After the lull, recreate a peak
buildAPeakOfAcquiredResourcesAndTriggerAlarmWithSideEffects( MAX_SIZE, clock, pool, holders ); acquireResourcesAndExceedTimeout( pool, clock, poolMaxSize );


// then // then
assertEquals( MAX_SIZE - MIN_SIZE + 1, stateMonitor.created.get() ); assertEquals( poolMaxSize - poolMinSize + 1, stateMonitor.created.get() );
assertEquals( 0, stateMonitor.disposed.get() ); assertEquals( 0, stateMonitor.disposed.get() );
}


private void exceedTimeout( FakeClock clock )
{
clock.forward( TIMEOUT_EXCEED_MILLIS, TimeUnit.MILLISECONDS );
} }


private void buildAPeakOfAcquiredResourcesAndTriggerAlarmWithSideEffects( int MAX_SIZE, FakeClock clock, private void acquireResourcesAndExceedTimeout( ResourcePool<Something> pool,
ResourcePool<Something> FakeClock clock, int resourcesToAcquire ) throws InterruptedException
pool,
List<ResourceHolder> holders ) throws
InterruptedException
{ {
holders.addAll( acquireFromPool( pool, MAX_SIZE ) ); List<ResourceHolder> holders = new LinkedList<>();
holders.addAll( acquireFromPool( pool, resourcesToAcquire ) );


clock.forward( 110, TimeUnit.MILLISECONDS ); exceedTimeout( clock );


// "Ring the bell" only on acquisition, of course. // "Ring the bell" only on acquisition, of course.
holders.addAll( acquireFromPool( pool, 1 ) ); holders.addAll( acquireFromPool( pool, 1 ) );
Expand All @@ -327,32 +329,18 @@ private void buildAPeakOfAcquiredResourcesAndTriggerAlarmWithSideEffects( int MA
} }
} }


private ResourcePool<Something> getResourcePool( StatefulMonitor stateMonitor, private ResourcePool<Something> getResourcePool( StatefulMonitor stateMonitor, FakeClock clock, int minSize )
FakeClock clock,
int minSize )
{ {
return new ResourcePool<Something>( minSize, ResourcePool.CheckStrategy.TimeoutCheckStrategy timeoutCheckStrategy =
new ResourcePool.CheckStrategy.TimeoutCheckStrategy( 100, clock ), stateMonitor ) new ResourcePool.CheckStrategy.TimeoutCheckStrategy( TIMEOUT_MILLIS, clock );
{ return new SomethingResourcePool( minSize, timeoutCheckStrategy, stateMonitor );
@Override
protected Something create()
{
return new Something();
}

@Override
protected boolean isAlive( Something resource )
{
return !resource.closed;
}
};
} }


private List<ResourceHolder> acquireFromPool( final ResourcePool pool, int times ) throws InterruptedException private List<ResourceHolder> acquireFromPool( ResourcePool pool, int resourcesToAcquire ) throws InterruptedException
{ {
List<ResourceHolder> acquirers = new LinkedList<ResourceHolder>(); List<ResourceHolder> acquirers = new LinkedList<>();
final CountDownLatch latch = new CountDownLatch( times ); final CountDownLatch latch = new CountDownLatch( resourcesToAcquire );
for ( int i = 0; i < times; i++ ) for ( int i = 0; i < resourcesToAcquire; i++ )
{ {
ResourceHolder holder = new ResourceHolder( pool, latch ); ResourceHolder holder = new ResourceHolder( pool, latch );
Thread t = new Thread( holder ); Thread t = new Thread( holder );
Expand All @@ -363,6 +351,26 @@ private List<ResourceHolder> acquireFromPool( final ResourcePool pool, int times
return acquirers; return acquirers;
} }


private static class SomethingResourcePool extends ResourcePool<Something>
{
public SomethingResourcePool( int minSize, CheckStrategy checkStrategy, StatefulMonitor stateMonitor )
{
super( minSize, checkStrategy, stateMonitor );
}

@Override
protected Something create()
{
return new Something();
}

@Override
protected boolean isAlive( Something resource )
{
return !resource.closed;
}
}

private class ResourceHolder implements Runnable private class ResourceHolder implements Runnable
{ {
private final Semaphore latch = new Semaphore( 0 ); private final Semaphore latch = new Semaphore( 0 );
Expand Down

0 comments on commit e8b1118

Please sign in to comment.