Skip to content

Commit

Permalink
Wait for open calls to finish on index drop and close
Browse files Browse the repository at this point in the history
Currently any concurrent open call to index will cause exception to be throw,
that will cause database panic.
To relax current state a bit and do minimal changes to a way how we handle
state in contract checking index we will wait now for all open calls to finish
before performing drop or close operations.
  • Loading branch information
MishaDemianenko committed Jun 27, 2017
1 parent 769dbdb commit dd2ebd6
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 53 deletions.
Expand Up @@ -21,8 +21,10 @@


import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;


import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.IndexUpdater;
Expand Down Expand Up @@ -141,7 +143,7 @@ public Future<Void> drop() throws IOException


if ( state.compareAndSet( State.STARTED, State.CLOSED ) ) if ( state.compareAndSet( State.STARTED, State.CLOSED ) )
{ {
ensureNoOpenCalls( "drop" ); waitOpenCallsToClose();
return super.drop(); return super.drop();
} }


Expand All @@ -159,13 +161,21 @@ public Future<Void> close() throws IOException


if ( state.compareAndSet( State.STARTED, State.CLOSED ) ) if ( state.compareAndSet( State.STARTED, State.CLOSED ) )
{ {
ensureNoOpenCalls( "close" ); waitOpenCallsToClose();
return super.close(); return super.close();
} }


throw new IllegalStateException( "IndexProxy already closed" ); throw new IllegalStateException( "IndexProxy already closed" );
} }


private void waitOpenCallsToClose()
{
while ( openCalls.intValue() > 0 )
{
LockSupport.parkNanos( TimeUnit.MILLISECONDS.toNanos( 10 ) );
}
}

private void openCall( String name ) private void openCall( String name )
{ {
// do not open call unless we are in STARTED // do not open call unless we are in STARTED
Expand All @@ -181,13 +191,6 @@ private void openCall( String name )
throw new IllegalStateException("Cannot call " + name + "() when index state is " + state.get() ); throw new IllegalStateException("Cannot call " + name + "() when index state is " + state.get() );
} }


private void ensureNoOpenCalls(String name)
{
if (openCalls.get() > 0)
throw new IllegalStateException( "Concurrent " + name + "() while updates have not completed" );

}

private void closeCall() private void closeCall()
{ {
// rollback once the call finished or failed // rollback once the call finished or failed
Expand Down
Expand Up @@ -22,15 +22,20 @@
import org.junit.Test; import org.junit.Test;


import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;


import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.test.DoubleLatch; import org.neo4j.test.DoubleLatch;
import org.neo4j.test.ThreadTestUtils;


import static org.neo4j.kernel.impl.api.index.SchemaIndexTestHelper.mockIndexProxy; import static org.neo4j.kernel.impl.api.index.SchemaIndexTestHelper.mockIndexProxy;


public class ContractCheckingIndexProxyTest public class ContractCheckingIndexProxyTest
{ {
private static final long TEST_TIMEOUT = 10_000;

@Test( expected = /* THEN */ IllegalStateException.class ) @Test( expected = /* THEN */ IllegalStateException.class )
public void shouldNotCreateIndexTwice() throws IOException public void shouldNotCreateIndexTwice() throws IOException
{ {
Expand Down Expand Up @@ -219,11 +224,11 @@ public void start()
} }
} }


@Test( expected = /* THEN */ IllegalStateException.class ) @Test( timeout = TEST_TIMEOUT)
public void shouldNotCloseWhileUpdating() throws IOException public void closeWaitForUpdateToFinish() throws IOException, InterruptedException
{ {
// GIVEN // GIVEN
final DoubleLatch latch = new DoubleLatch(); CountDownLatch latch = new CountDownLatch( 1 );
final IndexProxy inner = new IndexProxyAdapter() final IndexProxy inner = new IndexProxyAdapter()
{ {
@Override @Override
Expand All @@ -233,81 +238,96 @@ public IndexUpdater newUpdater( IndexUpdateMode mode )
} }
}; };
final IndexProxy outer = newContractCheckingIndexProxy( inner ); final IndexProxy outer = newContractCheckingIndexProxy( inner );
Thread actionThread = createActionThread( outer::close );
outer.start(); outer.start();


// WHEN // WHEN
runInSeparateThread( () -> Thread updaterThread = runInSeparateThread( () -> {
{ try ( IndexUpdater updater = outer.newUpdater( IndexUpdateMode.ONLINE ) )
try (IndexUpdater updater = outer.newUpdater( IndexUpdateMode.ONLINE ))
{ {
updater.process( null ); updater.process( null );
latch.startAndWaitForAllToStartAndFinish(); try
{
actionThread.start();
latch.await();
}
catch ( InterruptedException e )
{
throw new RuntimeException( e );
}
} }
catch ( IndexEntryConflictException e ) catch ( IndexEntryConflictException e )
{ {
throw new RuntimeException( e ); throw new RuntimeException( e );
} }
} ); } );


try ThreadTestUtils.awaitThreadState( actionThread, 5_000, Thread.State.TIMED_WAITING );
{ latch.countDown();
latch.waitForAllToStart(); updaterThread.join();
outer.close(); actionThread.join();
}
finally
{
latch.finish();
}
} }


@Test( expected = /* THEN */ IllegalStateException.class ) @Test( timeout = TEST_TIMEOUT )
public void shouldNotCloseWhileForcing() throws IOException public void closeWaitForForceToComplete() throws Exception
{ {
// GIVEN // GIVEN
final DoubleLatch latch = new DoubleLatch(); CountDownLatch latch = new CountDownLatch( 1 );
AtomicReference<Thread> actionThreadReference = new AtomicReference<>();
final IndexProxy inner = new IndexProxyAdapter() final IndexProxy inner = new IndexProxyAdapter()
{ {
@Override @Override
public void force() public void force()
{ {
latch.startAndWaitForAllToStartAndFinish(); try
{
actionThreadReference.get().start();
latch.await();
}
catch ( Exception e )
{
throw new RuntimeException( e );
}
} }
}; };
final IndexProxy outer = newContractCheckingIndexProxy( inner ); IndexProxy outer = newContractCheckingIndexProxy( inner );
Thread actionThread = createActionThread( outer::close );
actionThreadReference.set( actionThread );

outer.start(); outer.start();
Thread thread = runInSeparateThread( outer::force );


// WHEN ThreadTestUtils.awaitThreadState( actionThread, 5_000, Thread.State.TIMED_WAITING );
runInSeparateThread( () -> outer.force() ); latch.countDown();


try thread.join();
{ actionThread.join();
latch.waitForAllToStart();
outer.close();
}
finally
{
latch.finish();
}
} }


private interface ThrowingRunnable private interface ThrowingRunnable
{ {
void run() throws IOException; void run() throws IOException;
} }


private void runInSeparateThread( final ThrowingRunnable action ) private Thread runInSeparateThread( final ThrowingRunnable action )
{ {
new Thread( () -> Thread thread = createActionThread( action );
{ thread.start();
try return thread;
{ }
action.run();
} private Thread createActionThread( ThrowingRunnable action )
catch ( IOException e ) {
{ return new Thread( () -> {
throw new RuntimeException( e ); try
} {
} ).start(); action.run();
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
} );
} }


private ContractCheckingIndexProxy newContractCheckingIndexProxy( IndexProxy inner ) private ContractCheckingIndexProxy newContractCheckingIndexProxy( IndexProxy inner )
Expand Down

0 comments on commit dd2ebd6

Please sign in to comment.