From 62e31964943969e839c04fd046b702d1095c2a9a Mon Sep 17 00:00:00 2001 From: Mikhaylo Demianenko Date: Mon, 26 Sep 2016 11:25:00 +0200 Subject: [PATCH] Guard index drop with commitCloseLock Guard index drop with same commit close lock since drop will try to close index first. Preventing lucene race during two phase commit inside lucene when writer can't find corresponding commit call: 'cannot close: prepareCommit was already called with no corresponding call to commit'. Remove unused method. --- .../index/impl/lucene/legacy/FullTxData.java | 24 ------ .../index/WritableAbstractDatabaseIndex.java | 10 ++- .../index/DatabaseIndexIntegrationTest.java | 73 +++++++++++++++---- 3 files changed, 69 insertions(+), 38 deletions(-) diff --git a/community/lucene-index/src/main/java/org/neo4j/index/impl/lucene/legacy/FullTxData.java b/community/lucene-index/src/main/java/org/neo4j/index/impl/lucene/legacy/FullTxData.java index 27e95ce4c82f4..c1203a7fb8c87 100644 --- a/community/lucene-index/src/main/java/org/neo4j/index/impl/lucene/legacy/FullTxData.java +++ b/community/lucene-index/src/main/java/org/neo4j/index/impl/lucene/legacy/FullTxData.java @@ -402,30 +402,6 @@ private IndexSearcher searcher( boolean allowRefreshSearcher ) return this.searcher; } - private static void safeClose( Object object ) - { - if ( object == null ) - { - return; - } - - try - { - if ( object instanceof IndexWriter ) - { - ( ( IndexWriter ) object ).close(); - } - else if ( object instanceof IndexReader ) - { - ( ( IndexReader ) object ).close(); - } - } - catch ( IOException e ) - { - // Ok - } - } - @Override IndexSearcher asSearcher( TxDataHolder holder, QueryContext context ) { diff --git a/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/WritableAbstractDatabaseIndex.java b/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/WritableAbstractDatabaseIndex.java index 56e88393c4331..2f913ca104e7b 100644 --- a/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/WritableAbstractDatabaseIndex.java +++ b/community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/WritableAbstractDatabaseIndex.java @@ -106,7 +106,15 @@ public boolean isValid() @Override public void drop() throws IOException { - luceneIndex.drop(); + commitCloseLock.lock(); + try + { + luceneIndex.drop(); + } + finally + { + commitCloseLock.unlock(); + } } /** diff --git a/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/index/DatabaseIndexIntegrationTest.java b/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/index/DatabaseIndexIntegrationTest.java index 90762bf918d7d..39a9b4fa16c1d 100644 --- a/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/index/DatabaseIndexIntegrationTest.java +++ b/community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/index/DatabaseIndexIntegrationTest.java @@ -32,7 +32,9 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -45,6 +47,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.function.Supplier; import java.util.zip.ZipOutputStream; import org.neo4j.io.fs.DefaultFileSystemAbstraction; @@ -62,38 +65,50 @@ public class DatabaseIndexIntegrationTest { private static final int THREAD_NUMBER = 5; + private static ExecutorService workers; + @Rule public TargetDirectory.TestDirectory testDir = TargetDirectory.testDirForTest( getClass() ); @Rule public RepeatRule repeatRule = new RepeatRule(); - private final CountDownLatch closeRaceSignal = new CountDownLatch( 1 ); - + private final CountDownLatch raceSignal = new CountDownLatch( 1 ); private SyncNotifierDirectoryFactory directoryFactory; private WritableTestDatabaseIndex luceneIndex; - private ExecutorService workers; + + + @BeforeClass + public static void initExecutors() + { + workers = Executors.newFixedThreadPool( THREAD_NUMBER ); + } + + @AfterClass + public static void shutDownExecutor() + { + workers.shutdownNow(); + } @Before public void setUp() throws IOException { - directoryFactory = new SyncNotifierDirectoryFactory( closeRaceSignal ); + directoryFactory = new SyncNotifierDirectoryFactory( raceSignal ); luceneIndex = createTestLuceneIndex( directoryFactory, testDir.directory() ); - workers = Executors.newFixedThreadPool( THREAD_NUMBER ); } @After public void tearDown() { - workers.shutdownNow(); directoryFactory.close(); } @Test( timeout = 10000 ) - @RepeatRule.Repeat( times = 5 ) + @RepeatRule.Repeat( times = 2 ) public void testSaveCallCommitAndCloseFromMultipleThreads() throws Exception { generateInitialData(); - List> closeFutures = submitCloseTasks( closeRaceSignal ); + Supplier closeTaskSupplier = () -> createConcurrentCloseTask( raceSignal ); + List> closeFutures = submitTasks( closeTaskSupplier ); for ( Future closeFuture : closeFutures ) { @@ -103,6 +118,22 @@ public void testSaveCallCommitAndCloseFromMultipleThreads() throws Exception assertFalse( luceneIndex.isOpen() ); } + @Test( timeout = 10000 ) + @RepeatRule.Repeat( times = 2 ) + public void saveCallCloseAndDropFromMultipleThreads() throws Exception + { + generateInitialData(); + Supplier dropTaskSupplier = () -> createConcurrentDropTask( raceSignal ); + List> futures = submitTasks( dropTaskSupplier ); + + for ( Future future : futures ) + { + future.get(); + } + + assertFalse( luceneIndex.isOpen() ); + } + private static WritableTestDatabaseIndex createTestLuceneIndex( DirectoryFactory dirFactory, File folder ) throws IOException { DefaultFileSystemAbstraction fs = new DefaultFileSystemAbstraction(); @@ -113,15 +144,15 @@ private static WritableTestDatabaseIndex createTestLuceneIndex( DirectoryFactory return index; } - private List> submitCloseTasks( CountDownLatch closeRaceSignal ) + private List> submitTasks( Supplier taskSupplier ) { - List> closeFutures = new ArrayList<>( THREAD_NUMBER ); - closeFutures.add( workers.submit( createMainCloseTask() ) ); + List> futures = new ArrayList<>( THREAD_NUMBER ); + futures.add( workers.submit( createMainCloseTask() ) ); for ( int i = 0; i < THREAD_NUMBER - 1; i++ ) { - closeFutures.add( workers.submit( createConcurrentCloseTask( closeRaceSignal ) ) ); + futures.add( workers.submit( taskSupplier.get() ) ); } - return closeFutures; + return futures; } private void generateInitialData() throws IOException @@ -133,6 +164,22 @@ private void generateInitialData() throws IOException } } + private Runnable createConcurrentDropTask( CountDownLatch dropRaceSignal ) + { + return () -> { + try + { + dropRaceSignal.await(); + Thread.yield(); + luceneIndex.drop(); + } + catch ( Exception e ) + { + throw new RuntimeException( e ); + } + }; + } + private Runnable createConcurrentCloseTask( CountDownLatch closeRaceSignal ) { return () -> {