Skip to content

Commit

Permalink
Improve the multi-threaded write throughput of the bloom fulltext ind…
Browse files Browse the repository at this point in the history
…ex add-on by about a factor four.

We now drain the work queue in the applier and apply the updates as one batch, and only refresh any given index once per batch.
  • Loading branch information
chrisvest committed Sep 19, 2017
1 parent 994325f commit b4078be
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 14 deletions.
Expand Up @@ -22,7 +22,10 @@
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;


import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -58,7 +61,8 @@ class FulltextUpdateApplier
workQueue = new LinkedBlockingQueue<>(); workQueue = new LinkedBlockingQueue<>();
} }


<E extends Entity> BinaryLatch updatePropertyData( Map<Long,Map<String,Object>> state, WritableFulltext index ) throws <E extends Entity> BinaryLatch updatePropertyData( Map<Long,Map<String,Object>> state, WritableFulltext index )
throws
IOException IOException
{ {
BinaryLatch completedLatch = new BinaryLatch(); BinaryLatch completedLatch = new BinaryLatch();
Expand Down Expand Up @@ -98,8 +102,8 @@ private static void updateDocument( PartitionedIndexWriter indexWriter, long ent
} }


<E extends Entity> BinaryLatch removePropertyData( Iterable<PropertyEntry<E>> propertyEntries, <E extends Entity> BinaryLatch removePropertyData( Iterable<PropertyEntry<E>> propertyEntries,
Map<Long,Map<String,Object>> state, Map<Long,Map<String,Object>> state,
WritableFulltext index ) throws IOException WritableFulltext index ) throws IOException
{ {
BinaryLatch completedLatch = new BinaryLatch(); BinaryLatch completedLatch = new BinaryLatch();
FulltextIndexUpdate update = () -> FulltextIndexUpdate update = () ->
Expand Down Expand Up @@ -214,7 +218,7 @@ void stop()


private interface FulltextIndexUpdate private interface FulltextIndexUpdate
{ {
Pair<WritableFulltext, BinaryLatch> applyUpdateAndReturnIndex() throws IOException; Pair<WritableFulltext,BinaryLatch> applyUpdateAndReturnIndex() throws IOException;
} }


private static class ApplierThread extends Thread private static class ApplierThread extends Thread
Expand All @@ -233,13 +237,53 @@ private static class ApplierThread extends Thread
@Override @Override
public void run() public void run()
{ {
Set<WritableFulltext> refreshableSet = Collections.newSetFromMap( new IdentityHashMap<>() );
List<BinaryLatch> latches = new ArrayList<>();

FulltextIndexUpdate update; FulltextIndexUpdate update;
while ( (update = getNextUpdate()) != STOP_SIGNAL ) while ( (update = getNextUpdate()) != STOP_SIGNAL )
{ {
Pair<WritableFulltext,BinaryLatch> updateProgress = applyUpdate( update ); update = drainQueueAndApplyUpdates( update, refreshableSet, latches );
refreshIndex( updateProgress.first() ); refreshAndClearIndexes( refreshableSet );
updateProgress.other().release(); releaseAndClearLatches( latches );

if ( update == STOP_SIGNAL )
{
return;
}
}
}

private FulltextIndexUpdate drainQueueAndApplyUpdates(
FulltextIndexUpdate update,
Set<WritableFulltext> refreshableSet,
List<BinaryLatch> latches )
{
do
{
applyUpdate( update, refreshableSet, latches );
update = workQueue.poll();
}
while ( update != null && update != STOP_SIGNAL );
return update;
}

private void refreshAndClearIndexes( Set<WritableFulltext> refreshableSet )
{
for ( WritableFulltext index : refreshableSet )
{
refreshIndex( index );
}
refreshableSet.clear();
}

private void releaseAndClearLatches( List<BinaryLatch> latches )
{
for ( BinaryLatch latch : latches )
{
latch.release();
} }
latches.clear();
} }


private FulltextIndexUpdate getNextUpdate() private FulltextIndexUpdate getNextUpdate()
Expand All @@ -260,17 +304,20 @@ private FulltextIndexUpdate getNextUpdate()
return update; return update;
} }


private Pair<WritableFulltext,BinaryLatch> applyUpdate( FulltextIndexUpdate update ) private void applyUpdate( FulltextIndexUpdate update,
Set<WritableFulltext> refreshableSet,
List<BinaryLatch> latches )
{ {
try try
{ {
return update.applyUpdateAndReturnIndex(); Pair<WritableFulltext,BinaryLatch> updateProgress = update.applyUpdateAndReturnIndex();
refreshableSet.add( updateProgress.first() );
latches.add( updateProgress.other() );
} }
catch ( IOException e ) catch ( IOException e )
{ {
log.error( "Failed to apply fulltext index update.", e ); log.error( "Failed to apply fulltext index update.", e );
} }
return null;
} }


private void refreshIndex( WritableFulltext index ) private void refreshIndex( WritableFulltext index )
Expand Down
Expand Up @@ -27,17 +27,14 @@


class WritableFulltext extends WritableAbstractDatabaseIndex<LuceneFulltext> class WritableFulltext extends WritableAbstractDatabaseIndex<LuceneFulltext>
{ {
private final LuceneFulltext luceneFulltext;

WritableFulltext( LuceneFulltext luceneFulltext ) WritableFulltext( LuceneFulltext luceneFulltext )
{ {
super( luceneFulltext ); super( luceneFulltext );
this.luceneFulltext = luceneFulltext;
} }


PartitionedIndexWriter getIndexWriter() throws IOException PartitionedIndexWriter getIndexWriter() throws IOException
{ {
return luceneFulltext.getIndexWriter( this ); return luceneIndex.getIndexWriter( this );
} }


Set<String> properties() Set<String> properties()
Expand Down
Expand Up @@ -21,13 +21,15 @@


import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;


import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;


import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Node;
Expand Down Expand Up @@ -315,6 +317,71 @@ public void updatesAreAvailableToConcurrentReadTransactions() throws Exception
} }
} }


@Ignore( "This is more of a rudimentary write-throughput benchmark, which we can delete or turn into a proper " +
"benchmark at a later date")
@Test
public void fiveHundredThousandUpdates() throws Exception
{
GraphDatabaseBuilder builder = factory.newEmbeddedDatabaseBuilder( testDirectory.graphDbDir() );
builder.setConfig( LoadableBloomFulltextConfig.bloom_indexed_properties, "prop" );
db = builder.newGraphDatabase();

int trials = 50;
int threadCount = 10;
int updatesPerThread = 1000;
int updatesPerIteration = 10;
int iterationsPerThread = updatesPerThread / updatesPerIteration;
String[] words =
("dui nunc mattis enim ut tellus elementum sagittis vitae et leo duis ut diam quam nulla porttitor " +
"massa id neque aliquam vestibulum morbi blandit cursus risus at ultrices mi tempus imperdiet nulla " +
"malesuada pellentesque elit eget gravida cum sociis natoque penatibus et magnis dis parturient " +
"montes nascetur ridiculus mus mauris").split( " " );

Runnable work = () ->
{
ThreadLocalRandom rng = ThreadLocalRandom.current();
StringBuilder sb = new StringBuilder();
for ( int i = 0; i < iterationsPerThread; i++ )
{
try ( Transaction tx = db.beginTx() )
{
for ( int j = 0; j < updatesPerIteration; j++ )
{
sb.setLength( 0 );
int wordCount = rng.nextInt( 3, 7 );
for ( int k = 0; k < wordCount; k++ )
{
sb.append( words[rng.nextInt( words.length )] ).append( ' ' );
}
sb.setLength( sb.length() - 1 );
db.createNode().setProperty( "prop", sb.toString() );
}
tx.success();
}
}
};

for ( int i = 0; i < trials; i++ )
{
long startMillis = System.currentTimeMillis();
Thread[] threads = new Thread[threadCount];
for ( int j = 0; j < threadCount; j++ )
{
threads[j] = new Thread( work );
}
for ( Thread thread : threads )
{
thread.start();
}
for ( Thread thread : threads )
{
thread.join();
}
long elapsedMillis = System.currentTimeMillis() - startMillis;
System.out.printf( "elapsed: %s ms.%n", elapsedMillis );
}
}

@Test @Test
public void shouldNotBeAbleToStartWithoutConfiguringProperties() throws Exception public void shouldNotBeAbleToStartWithoutConfiguringProperties() throws Exception
{ {
Expand Down

0 comments on commit b4078be

Please sign in to comment.