diff --git a/community/community-it/index-it/src/test/java/org/neo4j/kernel/api/impl/fulltext/ConcurrentLuceneFulltextUpdaterTest.java b/community/community-it/index-it/src/test/java/org/neo4j/kernel/api/impl/fulltext/ConcurrentLuceneFulltextUpdaterTest.java index 8a62ebce90dc..b6588bb936b9 100644 --- a/community/community-it/index-it/src/test/java/org/neo4j/kernel/api/impl/fulltext/ConcurrentLuceneFulltextUpdaterTest.java +++ b/community/community-it/index-it/src/test/java/org/neo4j/kernel/api/impl/fulltext/ConcurrentLuceneFulltextUpdaterTest.java @@ -19,22 +19,37 @@ */ package org.neo4j.kernel.api.impl.fulltext; +import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; import org.neo4j.function.ThrowingAction; -import org.neo4j.graphdb.Label; import org.neo4j.graphdb.Transaction; import org.neo4j.internal.kernel.api.IndexReference; import org.neo4j.internal.kernel.api.SchemaWrite; import org.neo4j.internal.kernel.api.schema.SchemaDescriptor; import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.impl.api.KernelTransactionImplementation; +import org.neo4j.logging.FormattedLogProvider; +import org.neo4j.logging.Level; +import org.neo4j.logging.Log; +import org.neo4j.logging.async.AsyncLogEvent; +import org.neo4j.logging.async.AsyncLogProvider; +import org.neo4j.scheduler.Group; +import org.neo4j.scheduler.JobHandle; +import org.neo4j.scheduler.JobScheduler; import org.neo4j.storageengine.api.schema.IndexDescriptorFactory; import org.neo4j.test.Race; import org.neo4j.test.rule.RepeatRule; +import org.neo4j.util.concurrent.AsyncEvents; import static org.junit.Assert.assertEquals; import static org.neo4j.storageengine.api.EntityType.NODE; @@ -45,21 +60,47 @@ */ public class ConcurrentLuceneFulltextUpdaterTest extends LuceneFulltextTestSupport { - private final int aliceThreads = 10; - private final int bobThreads = 10; - private final int nodesCreatedPerThread = 10; + private final int aliceThreads = 1; + private final int bobThreads = 1; + private final int nodesCreatedPerThread = 500; private Race race; + private CountDownLatch aliceLatch = new CountDownLatch( 2 ); + private CountDownLatch bobLatch = new CountDownLatch( 2 ); + private JobHandle handle; + private OutputStream logFile; + private AsyncEvents events; + private Log log; @Override protected RepeatRule createRepeatRule() { - return new RepeatRule( false, 3 ); + return new RepeatRule( false, 1 ); } @Before - public void createRace() + public void createRace() throws Exception { race = new Race(); + JobScheduler scheduler = db.resolveDependency( JobScheduler.class ); + logFile = new FileOutputStream( db.databaseLayout().file( "fts-events.txt" ) ); + FormattedLogProvider logProvider = FormattedLogProvider.withDefaultLogLevel( Level.DEBUG ).toOutputStream( logFile ); + events = new AsyncEvents<>( AsyncLogEvent::process, AsyncEvents.Monitor.NONE ); + handle = scheduler.schedule( Group.FILE_IO_HELPER, events ); + events.awaitStartup(); + AsyncLogProvider asyncLogProvider = new AsyncLogProvider( events, logProvider ); + log = asyncLogProvider.getLog( ConcurrentLuceneFulltextUpdaterTest.class ); + FulltextIndexAccessor.TRACE_LOG = asyncLogProvider.getLog( FulltextIndexAccessor.class ); + FulltextIndexPopulator.TRACE_LOG = asyncLogProvider.getLog( FulltextIndexPopulator.class ); + } + + @After + public void stopLogger() throws IOException + { + handle.cancel( false ); + events.shutdown(); + events.awaitTermination(); + logFile.flush(); + logFile.close(); } private SchemaDescriptor getNewDescriptor( String[] entityTokens ) @@ -91,13 +132,25 @@ private void raceContestantsAndVerifyResults( SchemaDescriptor newDescriptor, Ru race.addContestant( changeConfig ); race.addContestants( bobThreads, bobWork ); race.go(); - Thread.sleep( 100 ); await( IndexDescriptorFactory.forSchema( newDescriptor, Optional.of( "nodes" ), FulltextIndexProviderFactory.DESCRIPTOR ) ); try ( Transaction tx = db.beginTx() ) { KernelTransaction ktx = kernelTransaction( tx ); ScoreEntityIterator bob = fulltextAdapter.query( ktx, "nodes", "bob" ); - assertEquals( bobThreads * nodesCreatedPerThread, bob.stream().count() ); + List list = bob.stream().collect( Collectors.toList() ); + try + { + assertEquals( bobThreads * nodesCreatedPerThread, list.size() ); + } + catch ( Throwable e ) + { + log.debug( "Nodes found in query for bob:" ); + for ( ScoreEntityIterator.ScoreEntry entry : list ) + { + log.debug( "\t" + db.getNodeById( entry.entityId() ) ); + } + throw e; + } ScoreEntityIterator alice = fulltextAdapter.query( ktx, "nodes", "alice" ); assertEquals( 0, alice.stream().count() ); } @@ -111,9 +164,12 @@ private Runnable work( int iterations, ThrowingAction work ) { for ( int i = 0; i < iterations; i++ ) { + Thread.yield(); try ( Transaction tx = db.beginTx() ) { + Thread.yield(); work.apply(); + Thread.yield(); tx.success(); } } @@ -129,12 +185,15 @@ private ThrowingAction dropAndReCreateIndex( IndexReference descripto { return () -> { + aliceLatch.await(); + bobLatch.await(); try ( KernelTransactionImplementation transaction = getKernelTransaction() ) { SchemaWrite schemaWrite = transaction.schemaWrite(); schemaWrite.indexDrop( descriptor ); schemaWrite.indexCreate( newDescriptor, FulltextIndexProviderFactory.DESCRIPTOR.name(), Optional.of( "nodes" ) ); transaction.success(); + log.debug( "drop an recreate" ); } }; } @@ -142,14 +201,23 @@ private ThrowingAction dropAndReCreateIndex( IndexReference descripto @Test public void labelledNodesCoreAPI() throws Throwable { - Label label = Label.label( "LABEL" ); - String[] entityTokens = {label.name()}; + String[] entityTokens = {LABEL.name()}; SchemaDescriptor descriptor = getExistingDescriptor( entityTokens ); SchemaDescriptor newDescriptor = getNewDescriptor( entityTokens ); IndexReference initialIndex = createInitialIndex( descriptor ); - Runnable aliceWork = work( nodesCreatedPerThread, () -> db.getNodeById( createNodeIndexableByPropertyValue( LABEL, "alice" ) ).addLabel( label ) ); - Runnable bobWork = work( nodesCreatedPerThread, () -> db.getNodeById( createNodeWithProperty( LABEL, "otherProp", "bob" ) ).addLabel( label ) ); + Runnable aliceWork = work( nodesCreatedPerThread, () -> + { + db.getNodeById( createNodeIndexableByPropertyValue( LABEL, "alice" ) ); + log.debug( "core api created an alice" ); + aliceLatch.countDown(); + } ); + Runnable bobWork = work( nodesCreatedPerThread, () -> + { + db.getNodeById( createNodeWithProperty( LABEL, "otherProp", "bob" ) ); + log.debug( "core api created a bob" ); + bobLatch.countDown(); + } ); Runnable changeConfig = work( 1, dropAndReCreateIndex( initialIndex, newDescriptor ) ); raceContestantsAndVerifyResults( newDescriptor, aliceWork, changeConfig, bobWork ); } @@ -157,14 +225,23 @@ public void labelledNodesCoreAPI() throws Throwable @Test public void labelledNodesCypherCurrent() throws Throwable { - Label label = Label.label( "LABEL" ); - String[] entityTokens = {label.name()}; + String[] entityTokens = {LABEL.name()}; SchemaDescriptor descriptor = getExistingDescriptor( entityTokens ); SchemaDescriptor newDescriptor = getNewDescriptor( entityTokens ); IndexReference initialIndex = createInitialIndex( descriptor ); - Runnable aliceWork = work( nodesCreatedPerThread, () -> db.execute( "create (:LABEL {" + PROP + ": \"alice\"})" ).close() ); - Runnable bobWork = work( nodesCreatedPerThread, () -> db.execute( "create (:LABEL {otherProp: \"bob\"})" ).close() ); + Runnable aliceWork = work( nodesCreatedPerThread, () -> + { + db.execute( "create (:LABEL {" + PROP + ": \"alice\"})" ).close(); + log.debug( "cypher current created an alice" ); + aliceLatch.countDown(); + } ); + Runnable bobWork = work( nodesCreatedPerThread, () -> + { + db.execute( "create (:LABEL {otherProp: \"bob\"})" ).close(); + log.debug( "cypher current created a bob" ); + bobLatch.countDown(); + } ); Runnable changeConfig = work( 1, dropAndReCreateIndex( initialIndex, newDescriptor ) ); raceContestantsAndVerifyResults( newDescriptor, aliceWork, changeConfig, bobWork ); } @@ -172,14 +249,23 @@ public void labelledNodesCypherCurrent() throws Throwable @Test public void labelledNodesCypher31() throws Throwable { - Label label = Label.label( "LABEL" ); - String[] entityTokens = {label.name()}; + String[] entityTokens = {LABEL.name()}; SchemaDescriptor descriptor = getExistingDescriptor( entityTokens ); SchemaDescriptor newDescriptor = getNewDescriptor( entityTokens ); IndexReference initialIndex = createInitialIndex( descriptor ); - Runnable aliceWork = work( nodesCreatedPerThread, () -> db.execute( "CYPHER 3.1 create (:LABEL {" + PROP + ": \"alice\"})" ).close() ); - Runnable bobWork = work( nodesCreatedPerThread, () -> db.execute( "CYPHER 3.1 create (:LABEL {otherProp: \"bob\"})" ).close() ); + Runnable aliceWork = work( nodesCreatedPerThread, () -> + { + db.execute( "CYPHER 3.1 create (:LABEL {" + PROP + ": \"alice\"})" ).close(); + log.debug( "cypher 3.1 created an alice" ); + aliceLatch.countDown(); + } ); + Runnable bobWork = work( nodesCreatedPerThread, () -> + { + db.execute( "CYPHER 3.1 create (:LABEL {otherProp: \"bob\"})" ).close(); + log.debug( "cypher 3.1 created a bob" ); + bobLatch.countDown(); + } ); Runnable changeConfig = work( 1, dropAndReCreateIndex( initialIndex, newDescriptor ) ); raceContestantsAndVerifyResults( newDescriptor, aliceWork, changeConfig, bobWork ); } @@ -187,14 +273,23 @@ public void labelledNodesCypher31() throws Throwable @Test public void labelledNodesCypher23() throws Throwable { - Label label = Label.label( "LABEL" ); - String[] entityTokens = {label.name()}; + String[] entityTokens = {LABEL.name()}; SchemaDescriptor descriptor = getExistingDescriptor( entityTokens ); SchemaDescriptor newDescriptor = getNewDescriptor( entityTokens ); IndexReference initialIndex = createInitialIndex( descriptor ); - Runnable aliceWork = work( nodesCreatedPerThread, () -> db.execute( "CYPHER 2.3 create (:LABEL {" + PROP + ": \"alice\"})" ).close() ); - Runnable bobWork = work( nodesCreatedPerThread, () -> db.execute( "CYPHER 2.3 create (:LABEL {otherProp: \"bob\"})" ).close() ); + Runnable aliceWork = work( nodesCreatedPerThread, () -> + { + db.execute( "CYPHER 2.3 create (:LABEL {" + PROP + ": \"alice\"})" ).close(); + log.debug( "cypher 2.3 created an alice" ); + aliceLatch.countDown(); + } ); + Runnable bobWork = work( nodesCreatedPerThread, () -> + { + db.execute( "CYPHER 2.3 create (:LABEL {otherProp: \"bob\"})" ).close(); + log.debug( "cypher 2.3 created a bob" ); + bobLatch.countDown(); + } ); Runnable changeConfig = work( 1, dropAndReCreateIndex( initialIndex, newDescriptor ) ); raceContestantsAndVerifyResults( newDescriptor, aliceWork, changeConfig, bobWork ); } @@ -202,14 +297,23 @@ public void labelledNodesCypher23() throws Throwable @Test public void labelledNodesCypherRule() throws Throwable { - Label label = Label.label( "LABEL" ); - String[] entityTokens = {label.name()}; + String[] entityTokens = {LABEL.name()}; SchemaDescriptor descriptor = getExistingDescriptor( entityTokens ); SchemaDescriptor newDescriptor = getNewDescriptor( entityTokens ); IndexReference initialIndex = createInitialIndex( descriptor ); - Runnable aliceWork = work( nodesCreatedPerThread, () -> db.execute( "CYPHER planner=rule create (:LABEL {" + PROP + ": \"alice\"})" ).close() ); - Runnable bobWork = work( nodesCreatedPerThread, () -> db.execute( "CYPHER planner=rule create (:LABEL {otherProp: \"bob\"})" ).close() ); + Runnable aliceWork = work( nodesCreatedPerThread, () -> + { + db.execute( "CYPHER planner=rule create (:LABEL {" + PROP + ": \"alice\"})" ).close(); + log.debug( "cypher rule created an alice" ); + aliceLatch.countDown(); + } ); + Runnable bobWork = work( nodesCreatedPerThread, () -> + { + db.execute( "CYPHER planner=rule create (:LABEL {otherProp: \"bob\"})" ).close(); + log.debug( "cypher rule created a bob" ); + bobLatch.countDown(); + } ); Runnable changeConfig = work( 1, dropAndReCreateIndex( initialIndex, newDescriptor ) ); raceContestantsAndVerifyResults( newDescriptor, aliceWork, changeConfig, bobWork ); } diff --git a/community/concurrent/src/main/java/org/neo4j/util/concurrent/AsyncEvents.java b/community/concurrent/src/main/java/org/neo4j/util/concurrent/AsyncEvents.java index 710e19bf4ec8..da1711b98211 100644 --- a/community/concurrent/src/main/java/org/neo4j/util/concurrent/AsyncEvents.java +++ b/community/concurrent/src/main/java/org/neo4j/util/concurrent/AsyncEvents.java @@ -25,16 +25,16 @@ /** * {@code AsyncEvents} is a mechanism for queueing up events to be processed asynchronously in a background thread. - * + *

* The {@code AsyncEvents} object implements {@link Runnable}, so it can be passed to a thread pool, or given to a * dedicated thread. The runnable will then occupy a thread and dedicate it to background processing of events, until * the {@code AsyncEvents} is {@link AsyncEvents#shutdown()}. - * + *

* If events are sent to an {@code AsyncEvents} that has been shut down, then those events will be processed in the * foreground as a fall-back. - * + *

* Note, however, that no events are processed until the background thread is started. - * + *

* The {@code AsyncEvents} is given a {@link Consumer} of the specified event type upon construction, and will use it * for doing the actual processing of events once they have been collected. * @@ -46,7 +46,9 @@ public interface Monitor { void eventCount( long count ); - Monitor NONE = count -> {}; + Monitor NONE = count -> + { + }; } // TODO use VarHandles in Java 9 @@ -165,7 +167,7 @@ private AsyncEvent reverseAndStripEndMark( AsyncEvent events ) /** * Initiate the shut down process of this {@code AsyncEvents} instance. - * + *

* This call does not block or otherwise wait for the background thread to terminate. */ public void shutdown() diff --git a/community/fulltext-index/src/main/java/org/neo4j/kernel/api/impl/fulltext/FulltextIndexAccessor.java b/community/fulltext-index/src/main/java/org/neo4j/kernel/api/impl/fulltext/FulltextIndexAccessor.java index 8773f5019c69..297520b4d543 100644 --- a/community/fulltext-index/src/main/java/org/neo4j/kernel/api/impl/fulltext/FulltextIndexAccessor.java +++ b/community/fulltext-index/src/main/java/org/neo4j/kernel/api/impl/fulltext/FulltextIndexAccessor.java @@ -19,6 +19,8 @@ */ package org.neo4j.kernel.api.impl.fulltext; +import org.apache.lucene.document.Document; + import java.io.IOException; import java.io.UncheckedIOException; @@ -27,6 +29,8 @@ import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.NodePropertyAccessor; import org.neo4j.kernel.impl.api.index.IndexUpdateMode; +import org.neo4j.logging.Log; +import org.neo4j.logging.NullLog; import org.neo4j.values.storable.Value; import static org.neo4j.kernel.api.impl.fulltext.LuceneFulltextDocumentStructure.documentRepresentingProperties; @@ -34,6 +38,8 @@ public class FulltextIndexAccessor extends AbstractLuceneIndexAccessor { + static Log TRACE_LOG = NullLog.getInstance(); + private final IndexUpdateSink indexUpdateSink; private final FulltextIndexDescriptor descriptor; private final Runnable onClose; @@ -116,7 +122,12 @@ protected void addIdempotent( long entityId, Value[] values ) { try { - writer.updateDocument( newTermForChangeOrRemove( entityId ), documentRepresentingProperties( entityId, descriptor.propertyNames(), values ) ); + Document document = documentRepresentingProperties( entityId, descriptor.propertyNames(), values ); + if ( TRACE_LOG.isDebugEnabled() ) + { + TRACE_LOG.debug( "updater add idempotent: %s", document.toString() ); + } + writer.updateDocument( newTermForChangeOrRemove( entityId ), document ); } catch ( IOException e ) { @@ -129,7 +140,12 @@ public void add( long entityId, Value[] values ) { try { - writer.addDocument( documentRepresentingProperties( entityId, descriptor.propertyNames(), values ) ); + Document document = documentRepresentingProperties( entityId, descriptor.propertyNames(), values ); + if ( TRACE_LOG.isDebugEnabled() ) + { + TRACE_LOG.debug( "updater add: %s", document.toString() ); + } + writer.addDocument( document ); } catch ( IOException e ) { diff --git a/community/fulltext-index/src/main/java/org/neo4j/kernel/api/impl/fulltext/FulltextIndexPopulator.java b/community/fulltext-index/src/main/java/org/neo4j/kernel/api/impl/fulltext/FulltextIndexPopulator.java index e606b65113c4..6b2d243b80e0 100644 --- a/community/fulltext-index/src/main/java/org/neo4j/kernel/api/impl/fulltext/FulltextIndexPopulator.java +++ b/community/fulltext-index/src/main/java/org/neo4j/kernel/api/impl/fulltext/FulltextIndexPopulator.java @@ -31,10 +31,14 @@ import org.neo4j.kernel.api.index.IndexEntryUpdate; import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.NodePropertyAccessor; +import org.neo4j.logging.Log; +import org.neo4j.logging.NullLog; import org.neo4j.storageengine.api.schema.IndexSample; public class FulltextIndexPopulator extends LuceneIndexPopulator> { + static Log TRACE_LOG = NullLog.getInstance(); + private final FulltextIndexDescriptor descriptor; private final ThrowingAction descriptorCreateAction; @@ -67,6 +71,7 @@ public void add( Collection> updates ) { for ( IndexEntryUpdate update : updates ) { + TRACE_LOG.debug( "populator add: %s", update ); writer.updateDocument( LuceneFulltextDocumentStructure.newTermForChangeOrRemove( update.getEntityId() ), updateAsDocument( update ) ); } } @@ -110,6 +115,7 @@ private class PopulatingFulltextIndexUpdater implements IndexUpdater @Override public void process( IndexEntryUpdate update ) { + TRACE_LOG.debug( "populating updater process: %s", update ); assert update.indexKey().schema().equals( descriptor.schema() ); try {