diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/ContractCheckingIndexProxy.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/ContractCheckingIndexProxy.java index e08b3f909796..a485cdc69ef7 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/ContractCheckingIndexProxy.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/ContractCheckingIndexProxy.java @@ -21,8 +21,10 @@ import java.io.IOException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; import org.neo4j.kernel.api.index.IndexUpdater; @@ -141,7 +143,7 @@ public Future drop() throws IOException if ( state.compareAndSet( State.STARTED, State.CLOSED ) ) { - ensureNoOpenCalls( "drop" ); + waitOpenCallsToClose(); return super.drop(); } @@ -159,13 +161,21 @@ public Future close() throws IOException if ( state.compareAndSet( State.STARTED, State.CLOSED ) ) { - ensureNoOpenCalls( "close" ); + waitOpenCallsToClose(); return super.close(); } throw new IllegalStateException( "IndexProxy already closed" ); } + private void waitOpenCallsToClose() + { + while ( openCalls.intValue() > 0 ) + { + LockSupport.parkNanos( TimeUnit.MILLISECONDS.toNanos( 10 ) ); + } + } + private void openCall( String name ) { // do not open call unless we are in STARTED @@ -181,13 +191,6 @@ private void openCall( String name ) throw new IllegalStateException("Cannot call " + name + "() when index state is " + state.get() ); } - private void ensureNoOpenCalls(String name) - { - if (openCalls.get() > 0) - throw new IllegalStateException( "Concurrent " + name + "() while updates have not completed" ); - - } - private void closeCall() { // rollback once the call finished or failed diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/ContractCheckingIndexProxyTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/ContractCheckingIndexProxyTest.java index 160cd97a2cfb..77413a8f35a0 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/ContractCheckingIndexProxyTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/ContractCheckingIndexProxyTest.java @@ -22,15 +22,20 @@ import org.junit.Test; import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.test.DoubleLatch; +import org.neo4j.test.ThreadTestUtils; import static org.neo4j.kernel.impl.api.index.SchemaIndexTestHelper.mockIndexProxy; public class ContractCheckingIndexProxyTest { + private static final long TEST_TIMEOUT = 10_000; + @Test( expected = /* THEN */ IllegalStateException.class ) public void shouldNotCreateIndexTwice() throws IOException { @@ -219,11 +224,11 @@ public void start() } } - @Test( expected = /* THEN */ IllegalStateException.class ) - public void shouldNotCloseWhileUpdating() throws IOException + @Test( timeout = TEST_TIMEOUT) + public void closeWaitForUpdateToFinish() throws IOException, InterruptedException { // GIVEN - final DoubleLatch latch = new DoubleLatch(); + CountDownLatch latch = new CountDownLatch( 1 ); final IndexProxy inner = new IndexProxyAdapter() { @Override @@ -233,15 +238,23 @@ public IndexUpdater newUpdater( IndexUpdateMode mode ) } }; final IndexProxy outer = newContractCheckingIndexProxy( inner ); + Thread actionThread = createActionThread( outer::close ); outer.start(); // WHEN - runInSeparateThread( () -> - { - try (IndexUpdater updater = outer.newUpdater( IndexUpdateMode.ONLINE )) + Thread updaterThread = runInSeparateThread( () -> { + try ( IndexUpdater updater = outer.newUpdater( IndexUpdateMode.ONLINE ) ) { updater.process( null ); - latch.startAndWaitForAllToStartAndFinish(); + try + { + actionThread.start(); + latch.await(); + } + catch ( InterruptedException e ) + { + throw new RuntimeException( e ); + } } catch ( IndexEntryConflictException e ) { @@ -249,45 +262,46 @@ public IndexUpdater newUpdater( IndexUpdateMode mode ) } } ); - try - { - latch.waitForAllToStart(); - outer.close(); - } - finally - { - latch.finish(); - } + ThreadTestUtils.awaitThreadState( actionThread, 5_000, Thread.State.TIMED_WAITING ); + latch.countDown(); + updaterThread.join(); + actionThread.join(); } - @Test( expected = /* THEN */ IllegalStateException.class ) - public void shouldNotCloseWhileForcing() throws IOException + @Test( timeout = TEST_TIMEOUT ) + public void closeWaitForForceToComplete() throws Exception { // GIVEN - final DoubleLatch latch = new DoubleLatch(); + CountDownLatch latch = new CountDownLatch( 1 ); + AtomicReference actionThreadReference = new AtomicReference<>(); final IndexProxy inner = new IndexProxyAdapter() { @Override public void force() { - latch.startAndWaitForAllToStartAndFinish(); + try + { + actionThreadReference.get().start(); + latch.await(); + } + catch ( Exception e ) + { + throw new RuntimeException( e ); + } } }; - final IndexProxy outer = newContractCheckingIndexProxy( inner ); + IndexProxy outer = newContractCheckingIndexProxy( inner ); + Thread actionThread = createActionThread( outer::close ); + actionThreadReference.set( actionThread ); + outer.start(); + Thread thread = runInSeparateThread( outer::force ); - // WHEN - runInSeparateThread( () -> outer.force() ); + ThreadTestUtils.awaitThreadState( actionThread, 5_000, Thread.State.TIMED_WAITING ); + latch.countDown(); - try - { - latch.waitForAllToStart(); - outer.close(); - } - finally - { - latch.finish(); - } + thread.join(); + actionThread.join(); } private interface ThrowingRunnable @@ -295,19 +309,25 @@ private interface ThrowingRunnable void run() throws IOException; } - private void runInSeparateThread( final ThrowingRunnable action ) + private Thread runInSeparateThread( final ThrowingRunnable action ) { - new Thread( () -> - { - try - { - action.run(); - } - catch ( IOException e ) - { - throw new RuntimeException( e ); - } - } ).start(); + Thread thread = createActionThread( action ); + thread.start(); + return thread; + } + + private Thread createActionThread( ThrowingRunnable action ) + { + return new Thread( () -> { + try + { + action.run(); + } + catch ( IOException e ) + { + throw new RuntimeException( e ); + } + } ); } private ContractCheckingIndexProxy newContractCheckingIndexProxy( IndexProxy inner )