Skip to content

Commit

Permalink
Make the concurrentLuceneFulltextUpdaterTest hopefully more likely to…
Browse files Browse the repository at this point in the history
… catch any data race errors, and capture some key events in a log if they happen.
  • Loading branch information
chrisvest committed Nov 14, 2018
1 parent cac62fd commit 97dfe7e
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 36 deletions.
Expand Up @@ -19,22 +19,37 @@
*/
package org.neo4j.kernel.api.impl.fulltext;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import org.neo4j.function.ThrowingAction;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Transaction;
import org.neo4j.internal.kernel.api.IndexReference;
import org.neo4j.internal.kernel.api.SchemaWrite;
import org.neo4j.internal.kernel.api.schema.SchemaDescriptor;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.impl.api.KernelTransactionImplementation;
import org.neo4j.logging.FormattedLogProvider;
import org.neo4j.logging.Level;
import org.neo4j.logging.Log;
import org.neo4j.logging.async.AsyncLogEvent;
import org.neo4j.logging.async.AsyncLogProvider;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobHandle;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.storageengine.api.schema.IndexDescriptorFactory;
import org.neo4j.test.Race;
import org.neo4j.test.rule.RepeatRule;
import org.neo4j.util.concurrent.AsyncEvents;

import static org.junit.Assert.assertEquals;
import static org.neo4j.storageengine.api.EntityType.NODE;
Expand All @@ -45,21 +60,47 @@
*/
public class ConcurrentLuceneFulltextUpdaterTest extends LuceneFulltextTestSupport
{
private final int aliceThreads = 10;
private final int bobThreads = 10;
private final int nodesCreatedPerThread = 10;
private final int aliceThreads = 1;
private final int bobThreads = 1;
private final int nodesCreatedPerThread = 500;
private Race race;
private CountDownLatch aliceLatch = new CountDownLatch( 2 );
private CountDownLatch bobLatch = new CountDownLatch( 2 );
private JobHandle handle;
private OutputStream logFile;
private AsyncEvents<AsyncLogEvent> events;
private Log log;

@Override
protected RepeatRule createRepeatRule()
{
return new RepeatRule( false, 3 );
return new RepeatRule( false, 1 );
}

@Before
public void createRace()
public void createRace() throws Exception
{
race = new Race();
JobScheduler scheduler = db.resolveDependency( JobScheduler.class );
logFile = new FileOutputStream( db.databaseLayout().file( "fts-events.txt" ) );
FormattedLogProvider logProvider = FormattedLogProvider.withDefaultLogLevel( Level.DEBUG ).toOutputStream( logFile );
events = new AsyncEvents<>( AsyncLogEvent::process, AsyncEvents.Monitor.NONE );
handle = scheduler.schedule( Group.FILE_IO_HELPER, events );
events.awaitStartup();
AsyncLogProvider asyncLogProvider = new AsyncLogProvider( events, logProvider );
log = asyncLogProvider.getLog( ConcurrentLuceneFulltextUpdaterTest.class );
FulltextIndexAccessor.TRACE_LOG = asyncLogProvider.getLog( FulltextIndexAccessor.class );
FulltextIndexPopulator.TRACE_LOG = asyncLogProvider.getLog( FulltextIndexPopulator.class );
}

@After
public void stopLogger() throws IOException
{
handle.cancel( false );
events.shutdown();
events.awaitTermination();
logFile.flush();
logFile.close();
}

private SchemaDescriptor getNewDescriptor( String[] entityTokens )
Expand Down Expand Up @@ -91,13 +132,25 @@ private void raceContestantsAndVerifyResults( SchemaDescriptor newDescriptor, Ru
race.addContestant( changeConfig );
race.addContestants( bobThreads, bobWork );
race.go();
Thread.sleep( 100 );
await( IndexDescriptorFactory.forSchema( newDescriptor, Optional.of( "nodes" ), FulltextIndexProviderFactory.DESCRIPTOR ) );
try ( Transaction tx = db.beginTx() )
{
KernelTransaction ktx = kernelTransaction( tx );
ScoreEntityIterator bob = fulltextAdapter.query( ktx, "nodes", "bob" );
assertEquals( bobThreads * nodesCreatedPerThread, bob.stream().count() );
List<ScoreEntityIterator.ScoreEntry> list = bob.stream().collect( Collectors.toList() );
try
{
assertEquals( bobThreads * nodesCreatedPerThread, list.size() );
}
catch ( Throwable e )
{
log.debug( "Nodes found in query for bob:" );
for ( ScoreEntityIterator.ScoreEntry entry : list )
{
log.debug( "\t" + db.getNodeById( entry.entityId() ) );
}
throw e;
}
ScoreEntityIterator alice = fulltextAdapter.query( ktx, "nodes", "alice" );
assertEquals( 0, alice.stream().count() );
}
Expand All @@ -111,9 +164,12 @@ private Runnable work( int iterations, ThrowingAction<Exception> work )
{
for ( int i = 0; i < iterations; i++ )
{
Thread.yield();
try ( Transaction tx = db.beginTx() )
{
Thread.yield();
work.apply();
Thread.yield();
tx.success();
}
}
Expand All @@ -129,87 +185,135 @@ private ThrowingAction<Exception> dropAndReCreateIndex( IndexReference descripto
{
return () ->
{
aliceLatch.await();
bobLatch.await();
try ( KernelTransactionImplementation transaction = getKernelTransaction() )
{
SchemaWrite schemaWrite = transaction.schemaWrite();
schemaWrite.indexDrop( descriptor );
schemaWrite.indexCreate( newDescriptor, FulltextIndexProviderFactory.DESCRIPTOR.name(), Optional.of( "nodes" ) );
transaction.success();
log.debug( "drop an recreate" );
}
};
}

@Test
public void labelledNodesCoreAPI() throws Throwable
{
Label label = Label.label( "LABEL" );
String[] entityTokens = {label.name()};
String[] entityTokens = {LABEL.name()};
SchemaDescriptor descriptor = getExistingDescriptor( entityTokens );
SchemaDescriptor newDescriptor = getNewDescriptor( entityTokens );
IndexReference initialIndex = createInitialIndex( descriptor );

Runnable aliceWork = work( nodesCreatedPerThread, () -> db.getNodeById( createNodeIndexableByPropertyValue( LABEL, "alice" ) ).addLabel( label ) );
Runnable bobWork = work( nodesCreatedPerThread, () -> db.getNodeById( createNodeWithProperty( LABEL, "otherProp", "bob" ) ).addLabel( label ) );
Runnable aliceWork = work( nodesCreatedPerThread, () ->
{
db.getNodeById( createNodeIndexableByPropertyValue( LABEL, "alice" ) );
log.debug( "core api created an alice" );
aliceLatch.countDown();
} );
Runnable bobWork = work( nodesCreatedPerThread, () ->
{
db.getNodeById( createNodeWithProperty( LABEL, "otherProp", "bob" ) );
log.debug( "core api created a bob" );
bobLatch.countDown();
} );
Runnable changeConfig = work( 1, dropAndReCreateIndex( initialIndex, newDescriptor ) );
raceContestantsAndVerifyResults( newDescriptor, aliceWork, changeConfig, bobWork );
}

@Test
public void labelledNodesCypherCurrent() throws Throwable
{
Label label = Label.label( "LABEL" );
String[] entityTokens = {label.name()};
String[] entityTokens = {LABEL.name()};
SchemaDescriptor descriptor = getExistingDescriptor( entityTokens );
SchemaDescriptor newDescriptor = getNewDescriptor( entityTokens );
IndexReference initialIndex = createInitialIndex( descriptor );

Runnable aliceWork = work( nodesCreatedPerThread, () -> db.execute( "create (:LABEL {" + PROP + ": \"alice\"})" ).close() );
Runnable bobWork = work( nodesCreatedPerThread, () -> db.execute( "create (:LABEL {otherProp: \"bob\"})" ).close() );
Runnable aliceWork = work( nodesCreatedPerThread, () ->
{
db.execute( "create (:LABEL {" + PROP + ": \"alice\"})" ).close();
log.debug( "cypher current created an alice" );
aliceLatch.countDown();
} );
Runnable bobWork = work( nodesCreatedPerThread, () ->
{
db.execute( "create (:LABEL {otherProp: \"bob\"})" ).close();
log.debug( "cypher current created a bob" );
bobLatch.countDown();
} );
Runnable changeConfig = work( 1, dropAndReCreateIndex( initialIndex, newDescriptor ) );
raceContestantsAndVerifyResults( newDescriptor, aliceWork, changeConfig, bobWork );
}

@Test
public void labelledNodesCypher31() throws Throwable
{
Label label = Label.label( "LABEL" );
String[] entityTokens = {label.name()};
String[] entityTokens = {LABEL.name()};
SchemaDescriptor descriptor = getExistingDescriptor( entityTokens );
SchemaDescriptor newDescriptor = getNewDescriptor( entityTokens );
IndexReference initialIndex = createInitialIndex( descriptor );

Runnable aliceWork = work( nodesCreatedPerThread, () -> db.execute( "CYPHER 3.1 create (:LABEL {" + PROP + ": \"alice\"})" ).close() );
Runnable bobWork = work( nodesCreatedPerThread, () -> db.execute( "CYPHER 3.1 create (:LABEL {otherProp: \"bob\"})" ).close() );
Runnable aliceWork = work( nodesCreatedPerThread, () ->
{
db.execute( "CYPHER 3.1 create (:LABEL {" + PROP + ": \"alice\"})" ).close();
log.debug( "cypher 3.1 created an alice" );
aliceLatch.countDown();
} );
Runnable bobWork = work( nodesCreatedPerThread, () ->
{
db.execute( "CYPHER 3.1 create (:LABEL {otherProp: \"bob\"})" ).close();
log.debug( "cypher 3.1 created a bob" );
bobLatch.countDown();
} );
Runnable changeConfig = work( 1, dropAndReCreateIndex( initialIndex, newDescriptor ) );
raceContestantsAndVerifyResults( newDescriptor, aliceWork, changeConfig, bobWork );
}

@Test
public void labelledNodesCypher23() throws Throwable
{
Label label = Label.label( "LABEL" );
String[] entityTokens = {label.name()};
String[] entityTokens = {LABEL.name()};
SchemaDescriptor descriptor = getExistingDescriptor( entityTokens );
SchemaDescriptor newDescriptor = getNewDescriptor( entityTokens );
IndexReference initialIndex = createInitialIndex( descriptor );

Runnable aliceWork = work( nodesCreatedPerThread, () -> db.execute( "CYPHER 2.3 create (:LABEL {" + PROP + ": \"alice\"})" ).close() );
Runnable bobWork = work( nodesCreatedPerThread, () -> db.execute( "CYPHER 2.3 create (:LABEL {otherProp: \"bob\"})" ).close() );
Runnable aliceWork = work( nodesCreatedPerThread, () ->
{
db.execute( "CYPHER 2.3 create (:LABEL {" + PROP + ": \"alice\"})" ).close();
log.debug( "cypher 2.3 created an alice" );
aliceLatch.countDown();
} );
Runnable bobWork = work( nodesCreatedPerThread, () ->
{
db.execute( "CYPHER 2.3 create (:LABEL {otherProp: \"bob\"})" ).close();
log.debug( "cypher 2.3 created a bob" );
bobLatch.countDown();
} );
Runnable changeConfig = work( 1, dropAndReCreateIndex( initialIndex, newDescriptor ) );
raceContestantsAndVerifyResults( newDescriptor, aliceWork, changeConfig, bobWork );
}

@Test
public void labelledNodesCypherRule() throws Throwable
{
Label label = Label.label( "LABEL" );
String[] entityTokens = {label.name()};
String[] entityTokens = {LABEL.name()};
SchemaDescriptor descriptor = getExistingDescriptor( entityTokens );
SchemaDescriptor newDescriptor = getNewDescriptor( entityTokens );
IndexReference initialIndex = createInitialIndex( descriptor );

Runnable aliceWork = work( nodesCreatedPerThread, () -> db.execute( "CYPHER planner=rule create (:LABEL {" + PROP + ": \"alice\"})" ).close() );
Runnable bobWork = work( nodesCreatedPerThread, () -> db.execute( "CYPHER planner=rule create (:LABEL {otherProp: \"bob\"})" ).close() );
Runnable aliceWork = work( nodesCreatedPerThread, () ->
{
db.execute( "CYPHER planner=rule create (:LABEL {" + PROP + ": \"alice\"})" ).close();
log.debug( "cypher rule created an alice" );
aliceLatch.countDown();
} );
Runnable bobWork = work( nodesCreatedPerThread, () ->
{
db.execute( "CYPHER planner=rule create (:LABEL {otherProp: \"bob\"})" ).close();
log.debug( "cypher rule created a bob" );
bobLatch.countDown();
} );
Runnable changeConfig = work( 1, dropAndReCreateIndex( initialIndex, newDescriptor ) );
raceContestantsAndVerifyResults( newDescriptor, aliceWork, changeConfig, bobWork );
}
Expand Down
Expand Up @@ -25,16 +25,16 @@

/**
* {@code AsyncEvents} is a mechanism for queueing up events to be processed asynchronously in a background thread.
*
* <p>
* The {@code AsyncEvents} object implements {@link Runnable}, so it can be passed to a thread pool, or given to a
* dedicated thread. The runnable will then occupy a thread and dedicate it to background processing of events, until
* the {@code AsyncEvents} is {@link AsyncEvents#shutdown()}.
*
* <p>
* If events are sent to an {@code AsyncEvents} that has been shut down, then those events will be processed in the
* foreground as a fall-back.
*
* <p>
* Note, however, that no events are processed until the background thread is started.
*
* <p>
* The {@code AsyncEvents} is given a {@link Consumer} of the specified event type upon construction, and will use it
* for doing the actual processing of events once they have been collected.
*
Expand All @@ -46,7 +46,9 @@ public interface Monitor
{
void eventCount( long count );

Monitor NONE = count -> {};
Monitor NONE = count ->
{
};
}

// TODO use VarHandles in Java 9
Expand Down Expand Up @@ -165,7 +167,7 @@ private AsyncEvent reverseAndStripEndMark( AsyncEvent events )

/**
* Initiate the shut down process of this {@code AsyncEvents} instance.
*
* <p>
* This call does not block or otherwise wait for the background thread to terminate.
*/
public void shutdown()
Expand Down
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.api.impl.fulltext;

import org.apache.lucene.document.Document;

import java.io.IOException;
import java.io.UncheckedIOException;

Expand All @@ -27,13 +29,17 @@
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.NodePropertyAccessor;
import org.neo4j.kernel.impl.api.index.IndexUpdateMode;
import org.neo4j.logging.Log;
import org.neo4j.logging.NullLog;
import org.neo4j.values.storable.Value;

import static org.neo4j.kernel.api.impl.fulltext.LuceneFulltextDocumentStructure.documentRepresentingProperties;
import static org.neo4j.kernel.api.impl.fulltext.LuceneFulltextDocumentStructure.newTermForChangeOrRemove;

public class FulltextIndexAccessor extends AbstractLuceneIndexAccessor<FulltextIndexReader,DatabaseFulltextIndex>
{
static Log TRACE_LOG = NullLog.getInstance();

private final IndexUpdateSink indexUpdateSink;
private final FulltextIndexDescriptor descriptor;
private final Runnable onClose;
Expand Down Expand Up @@ -116,7 +122,12 @@ protected void addIdempotent( long entityId, Value[] values )
{
try
{
writer.updateDocument( newTermForChangeOrRemove( entityId ), documentRepresentingProperties( entityId, descriptor.propertyNames(), values ) );
Document document = documentRepresentingProperties( entityId, descriptor.propertyNames(), values );
if ( TRACE_LOG.isDebugEnabled() )
{
TRACE_LOG.debug( "updater add idempotent: %s", document.toString() );
}
writer.updateDocument( newTermForChangeOrRemove( entityId ), document );
}
catch ( IOException e )
{
Expand All @@ -129,7 +140,12 @@ public void add( long entityId, Value[] values )
{
try
{
writer.addDocument( documentRepresentingProperties( entityId, descriptor.propertyNames(), values ) );
Document document = documentRepresentingProperties( entityId, descriptor.propertyNames(), values );
if ( TRACE_LOG.isDebugEnabled() )
{
TRACE_LOG.debug( "updater add: %s", document.toString() );
}
writer.addDocument( document );
}
catch ( IOException e )
{
Expand Down

0 comments on commit 97dfe7e

Please sign in to comment.