Skip to content

Commit

Permalink
Faster KernelTransactionsTest
Browse files Browse the repository at this point in the history
by using more efficient lock wait assertions
  • Loading branch information
tinwelint committed Sep 19, 2016
1 parent c5f0d5c commit bdf662a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 41 deletions.
Expand Up @@ -19,18 +19,16 @@
*/
package org.neo4j.kernel.impl.api;

import org.junit.Rule;
import org.junit.Test;

import java.time.Clock;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceArray;

Expand Down Expand Up @@ -66,10 +64,10 @@
import org.neo4j.storageengine.api.lock.ResourceLocker;
import org.neo4j.storageengine.api.txstate.ReadableTransactionState;
import org.neo4j.test.Race;
import org.neo4j.test.rule.concurrent.OtherThreadRule;
import org.neo4j.time.Clocks;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.locks.LockSupport.parkNanos;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
Expand All @@ -89,10 +87,15 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.helpers.collection.Iterators.asSet;
import static org.neo4j.kernel.api.KernelTransaction.Type.explicit;
import static org.neo4j.kernel.api.security.AccessMode.Static.WRITE;
import static org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory.DEFAULT;

public class KernelTransactionsTest
{
@Rule
public final OtherThreadRule<Void> t2 = new OtherThreadRule<>( "T2-" + getClass().getName() );

@Test
public void shouldListActiveTransactions() throws Exception
{
Expand Down Expand Up @@ -332,10 +335,10 @@ public void blockNewTransactions() throws Exception
KernelTransactions kernelTransactions = newKernelTransactions();
kernelTransactions.blockNewTransactions();

CountDownLatch aboutToStartTx = new CountDownLatch( 1 );
Future<KernelTransaction> txOpener = startTxInSeparateThread( kernelTransactions, aboutToStartTx );
Future<KernelTransaction> txOpener =
t2.execute( state -> kernelTransactions.newInstance( explicit, WRITE, 0L ) );
t2.get().waitUntilWaiting( location -> location.isAt( KernelTransactions.class, "newInstance" ) );

await( aboutToStartTx );
assertNotDone( txOpener );

kernelTransactions.unblockNewTransactions();
Expand All @@ -348,10 +351,10 @@ public void unblockNewTransactionsFromWrongThreadThrows() throws Exception
KernelTransactions kernelTransactions = newKernelTransactions();
kernelTransactions.blockNewTransactions();

CountDownLatch aboutToStartTx = new CountDownLatch( 1 );
Future<KernelTransaction> txOpener = startTxInSeparateThread( kernelTransactions, aboutToStartTx );
Future<KernelTransaction> txOpener =
t2.execute( state -> kernelTransactions.newInstance( explicit, WRITE, 0L ) );
t2.get().waitUntilWaiting( location -> location.isAt( KernelTransactions.class, "newInstance" ) );

await( aboutToStartTx );
assertNotDone( txOpener );

Future<?> wrongUnblocker = unblockTxsInSeparateThread( kernelTransactions );
Expand Down Expand Up @@ -472,41 +475,14 @@ private static TransactionCommitProcess newRememberingCommitProcess( final Trans
return commitProcess;
}

private static Future<KernelTransaction> startTxInSeparateThread( final KernelTransactions kernelTransactions,
final CountDownLatch aboutToStartTx )
{
return Executors.newSingleThreadExecutor().submit( new Callable<KernelTransaction>()
{
@Override
public KernelTransaction call()
{
aboutToStartTx.countDown();
return kernelTransactions.newInstance( KernelTransaction.Type.explicit, AccessMode.Static.WRITE, 0L );
}
} );
}

private static Future<?> unblockTxsInSeparateThread( final KernelTransactions kernelTransactions )
{
return Executors.newSingleThreadExecutor().submit( kernelTransactions::unblockNewTransactions );
}

private void await( CountDownLatch latch ) throws InterruptedException
{
assertTrue( latch.await( 1, MINUTES ) );
}

private static void assertNotDone( Future<?> future )
{
try
{
future.get( 2, TimeUnit.SECONDS );
fail( "Exception expected" );
}
catch ( Exception e )
{
assertThat( e, instanceOf( TimeoutException.class ) );
}
assertFalse( future.isDone() );
}

private static KernelTransactionHandle newHandle( KernelTransaction tx )
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.Closeable;
import java.io.PrintStream;
import java.lang.Thread.State;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
Expand All @@ -30,11 +31,13 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Predicate;

import org.neo4j.logging.Logger;

import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static java.util.Arrays.asList;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -217,17 +220,45 @@ public String toString()

public WaitDetails waitUntilWaiting() throws TimeoutException
{
return waitUntilThreadState( Thread.State.WAITING, Thread.State.TIMED_WAITING );
return waitUntilWaiting( details -> true );
}

public WaitDetails waitUntilBlocked() throws TimeoutException
{
return waitUntilThreadState( Thread.State.BLOCKED );
return waitUntilBlocked( details -> true );
}

public WaitDetails waitUntilWaiting( Predicate<WaitDetails> correctWait ) throws TimeoutException
{
return waitUntilThreadState( correctWait, Thread.State.WAITING, Thread.State.TIMED_WAITING );
}

public WaitDetails waitUntilBlocked( Predicate<WaitDetails> correctWait ) throws TimeoutException
{
return waitUntilThreadState( correctWait, Thread.State.BLOCKED );
}

public WaitDetails waitUntilThreadState( final Thread.State... possibleStates ) throws TimeoutException
{
return waitUntil( new AnyThreadState( possibleStates ) );
return waitUntilThreadState( details -> true, possibleStates );
}

public WaitDetails waitUntilThreadState( Predicate<WaitDetails> correctWait,
final Thread.State... possibleStates ) throws TimeoutException
{
long end = currentTimeMillis() + timeout;
WaitDetails details = null;
while ( !correctWait.test( (details = waitUntil( new AnyThreadState( possibleStates )) ) ) )
{
LockSupport.parkNanos( MILLISECONDS.toNanos( 20 ) );
if ( currentTimeMillis() > end )
{
throw new TimeoutException( "Wanted to wait for any of " + Arrays.toString( possibleStates ) +
" over at " + correctWait + ", but didn't managed to get there in " + timeout + "ms. " +
"instead ended up waiting in " + details );
}
}
return details;
}

public WaitDetails waitUntil( Predicate<Thread> condition ) throws TimeoutException
Expand Down

0 comments on commit bdf662a

Please sign in to comment.