Skip to content

Commit

Permalink
Added tests for blocking of new transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
lutovich authored and burqen committed Jun 30, 2016
1 parent dc56775 commit 07167b9
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,10 @@ public KernelTransactionImplementation newInstance()
}
};

// TODO: Optimize with read / write locks and or only do it on slaves
@Override
public KernelTransaction newInstance()
{
assert !newTransactionsLock.isWriteLockedByCurrentThread();
assertCurrentThreadIsNotBlockingNewTransactions();
newTransactionsLock.readLock().lock();
try
{
Expand Down Expand Up @@ -297,9 +296,9 @@ public void blockNewTransactions()
}

/**
* Allow new transactions to be started again if current thread is the one who called {@link #blockNewTransactions()}.
* If current thread is not the one that called {@link #blockNewTransactions()} or it has not been called at all
* then NO_OP.
* Allow new transactions to be started again if current thread is the one who called
* {@link #blockNewTransactions()}. If current thread is not the one that called {@link #blockNewTransactions()}
* or it has not been called at all then NO_OP.
*/
public void unblockNewTransactions()
{
Expand All @@ -308,4 +307,13 @@ public void unblockNewTransactions()
newTransactionsLock.writeLock().unlock();
}
}

private void assertCurrentThreadIsNotBlockingNewTransactions()
{
if ( newTransactionsLock.isWriteLockedByCurrentThread() )
{
throw new IllegalStateException(
"Thread that is blocking new transactions from starting can't start new transaction" );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceArray;

Expand Down Expand Up @@ -56,14 +62,18 @@
import org.neo4j.test.Race;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.locks.LockSupport.parkNanos;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -246,11 +256,56 @@ public void run()
}

@Test
public void failsToCreateNewInstanceWhenCurrentThreadBlockedNewTxs() throws Exception
public void threadThatBlocksNewTxsCantStartNewTxs() throws Exception
{
KernelTransactions kernelTransactions = null;
KernelTransactions kernelTransactions = newKernelTransactions();
kernelTransactions.blockNewTransactions();
kernelTransactions.newInstance().close();
try
{
kernelTransactions.newInstance();
fail( "Exception expected" );
}
catch ( Exception e )
{
assertThat( e, instanceOf( IllegalStateException.class ) );
}
}

@Test
public void blockNewTransactions() throws Exception
{
KernelTransactions kernelTransactions = newKernelTransactions();
kernelTransactions.blockNewTransactions();

CountDownLatch aboutToStartTx = new CountDownLatch( 1 );
Future<KernelTransaction> txOpener = startTxInSeparateThread( kernelTransactions, aboutToStartTx );

await( aboutToStartTx );
assertNotDone( txOpener );

kernelTransactions.unblockNewTransactions();
assertNotNull( txOpener.get( 2, TimeUnit.SECONDS ) );
}

@Test
public void unblockNewTransactionsFromWrongThreadDoesNothing() throws Exception
{
KernelTransactions kernelTransactions = newKernelTransactions();
kernelTransactions.blockNewTransactions();

CountDownLatch aboutToStartTx = new CountDownLatch( 1 );
Future<KernelTransaction> txOpener = startTxInSeparateThread( kernelTransactions, aboutToStartTx );

await( aboutToStartTx );
assertNotDone( txOpener );

Future<?> wrongUnblocker = unblockTxsInSeparateThread( kernelTransactions );

assertDone( wrongUnblocker );
assertNotDone( txOpener );

kernelTransactions.unblockNewTransactions();
assertNotNull( txOpener.get( 2, TimeUnit.SECONDS ) );
}

private static KernelTransactions newKernelTransactions()
Expand Down Expand Up @@ -331,4 +386,53 @@ private static NeoStoreTransactionContextFactory newMockContextFactoryWithChange
when( factory.newInstance() ).thenReturn( context );
return factory;
}

private static Future<KernelTransaction> startTxInSeparateThread( final KernelTransactions kernelTransactions,
final CountDownLatch aboutToStartTx )
{
return Executors.newSingleThreadExecutor().submit( new Callable<KernelTransaction>()
{
@Override
public KernelTransaction call()
{
aboutToStartTx.countDown();
return kernelTransactions.newInstance();
}
} );
}

private static Future<?> unblockTxsInSeparateThread( final KernelTransactions kernelTransactions )
{
return Executors.newSingleThreadExecutor().submit( new Runnable()
{
@Override
public void run()
{
kernelTransactions.unblockNewTransactions();
}
} );
}

private void await( CountDownLatch latch ) throws InterruptedException
{
assertTrue( latch.await( 1, MINUTES ) );
}

private static void assertDone( Future<?> future ) throws Exception
{
assertNull( future.get( 2, TimeUnit.SECONDS ) );
}

private static void assertNotDone( Future<?> future )
{
try
{
future.get( 2, TimeUnit.SECONDS );
fail( "Exception expected" );
}
catch ( Exception e )
{
assertThat( e, instanceOf( TimeoutException.class ) );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,12 @@ public Visitor<CommittedTransactionRepresentation,IOException> transactions()
public boolean visit( CommittedTransactionRepresentation transaction ) throws IOException
{
// TODO:
// - Change implementation of "freeze active transactions". How to block new transactions from starting
// - Maybe move from time -> tx ids as safe zone (or support both)
// - Flush buffered ids to disk (if memory intrusive)
// - Only do assertOpen after property reads if we are slave
// v Change implementation of "freeze active transactions". How to block new transactions from starting
// - Maybe move from time -> tx ids as safe zone (or support both) -- ??
// - Flush buffered ids to disk (if memory intrusive) -- ??
// - Only do assertOpen after property reads if we are slave or wait for transactions to terminate
// - Place all comments describing the problem / solution in a better place
// - Clean up how we handle Transient...Exception, TxTerminatedException, etc. + how we use Status.*
// v Clean up how we handle Transient...Exception, TxTerminatedException, etc. + how we use Status.*

// PROBLEM
// A slave can read inconsistent or corrupted data (mixed state records) because of reuse of property ids.
Expand Down

0 comments on commit 07167b9

Please sign in to comment.