Skip to content

Commit

Permalink
Cleanup: Replace Thread creation with executor
Browse files Browse the repository at this point in the history
  • Loading branch information
burqen authored and tinwelint committed Mar 30, 2017
1 parent 7215282 commit cc891fb
Showing 1 changed file with 51 additions and 30 deletions.
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.index.internal.gbptree; package org.neo4j.index.internal.gbptree;


import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.mutable.MutableLong;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
Expand All @@ -31,7 +32,13 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -82,13 +89,21 @@ public class GBPTreeTest
private PageCache pageCache; private PageCache pageCache;
private File indexFile; private File indexFile;
private static final Layout<MutableLong,MutableLong> layout = new SimpleLongLayout(); private static final Layout<MutableLong,MutableLong> layout = new SimpleLongLayout();
private ExecutorService executor;


@Before @Before
public void setUpIndexFile() public void setUp()
{ {
executor = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() );
indexFile = directory.file( "index" ); indexFile = directory.file( "index" );
} }


@After
public void teardown()
{
executor.shutdown();
}

/* Meta and state page tests */ /* Meta and state page tests */


@Test @Test
Expand Down Expand Up @@ -520,18 +535,17 @@ public void checkPointShouldLockOutWriter() throws Exception


// WHEN // WHEN
monitor.enabled = true; monitor.enabled = true;
Thread checkpointer = new Thread( throwing( () -> index.checkpoint( unlimited() ) ) ); Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( unlimited() ) ) );
checkpointer.start();
monitor.barrier.awaitUninterruptibly(); monitor.barrier.awaitUninterruptibly();
// now we're in the smack middle of a checkpoint // now we're in the smack middle of a checkpoint
Thread t2 = new Thread( throwing( () -> index.writer().close() ) ); Future<?> writerClose = executor.submit( throwing( () -> index.writer().close() ) );
t2.start();
t2.join( 200 );
assertTrue( Arrays.toString( checkpointer.getStackTrace() ), t2.isAlive() );
monitor.barrier.release();


// THEN // THEN
t2.join(); wait( writerClose );
monitor.barrier.release();

writerClose.get();
checkpoint.get();
} }
} }


Expand All @@ -543,24 +557,22 @@ public void checkPointShouldWaitForWriter() throws Exception
{ {
// WHEN // WHEN
Barrier.Control barrier = new Barrier.Control(); Barrier.Control barrier = new Barrier.Control();
Thread writerThread = new Thread( throwing( () -> Future<?> write = executor.submit( throwing( () ->
{ {
try ( Writer<MutableLong,MutableLong> writer = index.writer() ) try ( Writer<MutableLong,MutableLong> writer = index.writer() )
{ {
writer.put( new MutableLong( 1 ), new MutableLong( 1 ) ); writer.put( new MutableLong( 1 ), new MutableLong( 1 ) );
barrier.reached(); barrier.reached();
} }
} ) ); } ) );
writerThread.start();
barrier.awaitUninterruptibly(); barrier.awaitUninterruptibly();
Thread checkpointer = new Thread( throwing( () -> index.checkpoint( unlimited() ) ) ); Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( unlimited() ) ) );
checkpointer.start(); wait( checkpoint );
checkpointer.join( 200 );
assertTrue( checkpointer.isAlive() );


// THEN // THEN
barrier.release(); barrier.release();
checkpointer.join(); checkpoint.get();
write.get();
} }
} }


Expand All @@ -580,12 +592,11 @@ public void closeShouldLockOutWriter() throws Exception


// WHEN // WHEN
enabled.set( true ); enabled.set( true );
Thread closer = new Thread( throwing( index::close ) ); Future<?> close = executor.submit( throwing( index::close ) );
closer.start();
barrier.awaitUninterruptibly(); barrier.awaitUninterruptibly();
// now we're in the smack middle of a close/checkpoint // now we're in the smack middle of a close/checkpoint
AtomicReference<Exception> writerError = new AtomicReference<>(); AtomicReference<Exception> writerError = new AtomicReference<>();
Thread t2 = new Thread( () -> Future<?> write = executor.submit( () ->
{ {
try try
{ {
Expand All @@ -597,13 +608,12 @@ public void closeShouldLockOutWriter() throws Exception
} }
} ); } );


t2.start(); wait( write );
t2.join( 200 );
assertTrue( Arrays.toString( closer.getStackTrace() ), t2.isAlive() );
barrier.release(); barrier.release();


// THEN // THEN
t2.join(); write.get();
close.get();
assertTrue( "Writer should not be able to acquired after close", assertTrue( "Writer should not be able to acquired after close",
writerError.get() instanceof IllegalStateException ); writerError.get() instanceof IllegalStateException );
} }
Expand Down Expand Up @@ -639,24 +649,22 @@ public void closeShouldWaitForWriter() throws Exception


// WHEN // WHEN
Barrier.Control barrier = new Barrier.Control(); Barrier.Control barrier = new Barrier.Control();
Thread writerThread = new Thread( throwing( () -> Future<?> write= executor.submit( throwing( () ->
{ {
try ( Writer<MutableLong,MutableLong> writer = index.writer() ) try ( Writer<MutableLong,MutableLong> writer = index.writer() )
{ {
writer.put( new MutableLong( 1 ), new MutableLong( 1 ) ); writer.put( new MutableLong( 1 ), new MutableLong( 1 ) );
barrier.reached(); barrier.reached();
} }
} ) ); } ) );
writerThread.start();
barrier.awaitUninterruptibly(); barrier.awaitUninterruptibly();
Thread closer = new Thread( throwing( index::close ) ); Future<?> close = executor.submit( throwing( index::close ) );
closer.start(); wait( close );
closer.join( 200 );
assertTrue( closer.isAlive() );


// THEN // THEN
barrier.release(); barrier.release();
closer.join(); close.get();
write.get();
} }


/* Insertion and read tests */ /* Insertion and read tests */
Expand Down Expand Up @@ -775,6 +783,19 @@ public void shouldNotCheckpointOnCloseIfNoChangesHappened() throws Exception
assertEquals( 1, checkpointCounter.count() ); assertEquals( 1, checkpointCounter.count() );
} }


private void wait( Future<?> future )throws InterruptedException, ExecutionException
{
try
{
future.get( 200, TimeUnit.MILLISECONDS );
fail( "Expected timeout" );
}
catch ( TimeoutException e )
{
// good
}
}

private PageCache createPageCache( int pageSize ) private PageCache createPageCache( int pageSize )
{ {
return pageCacheRule.getPageCache( fs.get(), config().withPageSize( pageSize ) ); return pageCacheRule.getPageCache( fs.get(), config().withPageSize( pageSize ) );
Expand Down

0 comments on commit cc891fb

Please sign in to comment.