Skip to content

Commit

Permalink
Guard index drop with commitCloseLock
Browse files Browse the repository at this point in the history
Guard index drop with same commit close lock since drop will try to close index first.
Preventing lucene race during two phase commit inside lucene when writer can't find corresponding commit call:
'cannot close: prepareCommit was already called with no corresponding call to commit'.

Remove unused method.
  • Loading branch information
MishaDemianenko committed Sep 26, 2016
1 parent fa26a7d commit 62e3196
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 38 deletions.
Expand Up @@ -402,30 +402,6 @@ private IndexSearcher searcher( boolean allowRefreshSearcher )
return this.searcher;
}

private static void safeClose( Object object )
{
if ( object == null )
{
return;
}

try
{
if ( object instanceof IndexWriter )
{
( ( IndexWriter ) object ).close();
}
else if ( object instanceof IndexReader )
{
( ( IndexReader ) object ).close();
}
}
catch ( IOException e )
{
// Ok
}
}

@Override
IndexSearcher asSearcher( TxDataHolder holder, QueryContext context )
{
Expand Down
Expand Up @@ -106,7 +106,15 @@ public boolean isValid()
@Override
public void drop() throws IOException
{
luceneIndex.drop();
commitCloseLock.lock();
try
{
luceneIndex.drop();
}
finally
{
commitCloseLock.unlock();
}
}

/**
Expand Down
Expand Up @@ -32,7 +32,9 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -45,6 +47,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.zip.ZipOutputStream;

import org.neo4j.io.fs.DefaultFileSystemAbstraction;
Expand All @@ -62,38 +65,50 @@
public class DatabaseIndexIntegrationTest
{
private static final int THREAD_NUMBER = 5;
private static ExecutorService workers;

@Rule
public TargetDirectory.TestDirectory testDir = TargetDirectory.testDirForTest( getClass() );
@Rule
public RepeatRule repeatRule = new RepeatRule();

private final CountDownLatch closeRaceSignal = new CountDownLatch( 1 );

private final CountDownLatch raceSignal = new CountDownLatch( 1 );
private SyncNotifierDirectoryFactory directoryFactory;
private WritableTestDatabaseIndex luceneIndex;
private ExecutorService workers;


@BeforeClass
public static void initExecutors()
{
workers = Executors.newFixedThreadPool( THREAD_NUMBER );
}

@AfterClass
public static void shutDownExecutor()
{
workers.shutdownNow();
}

@Before
public void setUp() throws IOException
{
directoryFactory = new SyncNotifierDirectoryFactory( closeRaceSignal );
directoryFactory = new SyncNotifierDirectoryFactory( raceSignal );
luceneIndex = createTestLuceneIndex( directoryFactory, testDir.directory() );
workers = Executors.newFixedThreadPool( THREAD_NUMBER );
}

@After
public void tearDown()
{
workers.shutdownNow();
directoryFactory.close();
}

@Test( timeout = 10000 )
@RepeatRule.Repeat( times = 5 )
@RepeatRule.Repeat( times = 2 )
public void testSaveCallCommitAndCloseFromMultipleThreads() throws Exception
{
generateInitialData();
List<Future<?>> closeFutures = submitCloseTasks( closeRaceSignal );
Supplier<Runnable> closeTaskSupplier = () -> createConcurrentCloseTask( raceSignal );
List<Future<?>> closeFutures = submitTasks( closeTaskSupplier );

for ( Future<?> closeFuture : closeFutures )
{
Expand All @@ -103,6 +118,22 @@ public void testSaveCallCommitAndCloseFromMultipleThreads() throws Exception
assertFalse( luceneIndex.isOpen() );
}

@Test( timeout = 10000 )
@RepeatRule.Repeat( times = 2 )
public void saveCallCloseAndDropFromMultipleThreads() throws Exception
{
generateInitialData();
Supplier<Runnable> dropTaskSupplier = () -> createConcurrentDropTask( raceSignal );
List<Future<?>> futures = submitTasks( dropTaskSupplier );

for ( Future<?> future : futures )
{
future.get();
}

assertFalse( luceneIndex.isOpen() );
}

private static WritableTestDatabaseIndex createTestLuceneIndex( DirectoryFactory dirFactory, File folder ) throws IOException
{
DefaultFileSystemAbstraction fs = new DefaultFileSystemAbstraction();
Expand All @@ -113,15 +144,15 @@ private static WritableTestDatabaseIndex createTestLuceneIndex( DirectoryFactory
return index;
}

private List<Future<?>> submitCloseTasks( CountDownLatch closeRaceSignal )
private List<Future<?>> submitTasks( Supplier<Runnable> taskSupplier )
{
List<Future<?>> closeFutures = new ArrayList<>( THREAD_NUMBER );
closeFutures.add( workers.submit( createMainCloseTask() ) );
List<Future<?>> futures = new ArrayList<>( THREAD_NUMBER );
futures.add( workers.submit( createMainCloseTask() ) );
for ( int i = 0; i < THREAD_NUMBER - 1; i++ )
{
closeFutures.add( workers.submit( createConcurrentCloseTask( closeRaceSignal ) ) );
futures.add( workers.submit( taskSupplier.get() ) );
}
return closeFutures;
return futures;
}

private void generateInitialData() throws IOException
Expand All @@ -133,6 +164,22 @@ private void generateInitialData() throws IOException
}
}

private Runnable createConcurrentDropTask( CountDownLatch dropRaceSignal )
{
return () -> {
try
{
dropRaceSignal.await();
Thread.yield();
luceneIndex.drop();
}
catch ( Exception e )
{
throw new RuntimeException( e );
}
};
}

private Runnable createConcurrentCloseTask( CountDownLatch closeRaceSignal )
{
return () -> {
Expand Down

0 comments on commit 62e3196

Please sign in to comment.