From b4078befe02d75e85eb643699bf7609dc59af4d1 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 14 Sep 2017 14:28:40 +0200 Subject: [PATCH] Improve the multi-threaded write throughput of the bloom fulltext index add-on by about a factor four. We now drain the work queue in the applier and apply the updates as one batch, and only refresh any given index once per batch. --- .../impl/fulltext/FulltextUpdateApplier.java | 67 ++++++++++++++++--- .../api/impl/fulltext/WritableFulltext.java | 5 +- .../fulltext/integrations/bloom/BloomIT.java | 67 +++++++++++++++++++ 3 files changed, 125 insertions(+), 14 deletions(-) diff --git a/enterprise/fulltext-addon/src/main/java/org/neo4j/kernel/api/impl/fulltext/FulltextUpdateApplier.java b/enterprise/fulltext-addon/src/main/java/org/neo4j/kernel/api/impl/fulltext/FulltextUpdateApplier.java index abd6683d5f792..b86a860d41eb2 100644 --- a/enterprise/fulltext-addon/src/main/java/org/neo4j/kernel/api/impl/fulltext/FulltextUpdateApplier.java +++ b/enterprise/fulltext-addon/src/main/java/org/neo4j/kernel/api/impl/fulltext/FulltextUpdateApplier.java @@ -22,7 +22,10 @@ import org.apache.lucene.document.Document; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; @@ -58,7 +61,8 @@ class FulltextUpdateApplier workQueue = new LinkedBlockingQueue<>(); } - BinaryLatch updatePropertyData( Map> state, WritableFulltext index ) throws + BinaryLatch updatePropertyData( Map> state, WritableFulltext index ) + throws IOException { BinaryLatch completedLatch = new BinaryLatch(); @@ -98,8 +102,8 @@ private static void updateDocument( PartitionedIndexWriter indexWriter, long ent } BinaryLatch removePropertyData( Iterable> propertyEntries, - Map> state, - WritableFulltext index ) throws IOException + Map> state, + WritableFulltext index ) throws IOException { BinaryLatch completedLatch = new BinaryLatch(); FulltextIndexUpdate update = () -> @@ -214,7 +218,7 @@ void stop() private interface FulltextIndexUpdate { - Pair applyUpdateAndReturnIndex() throws IOException; + Pair applyUpdateAndReturnIndex() throws IOException; } private static class ApplierThread extends Thread @@ -233,13 +237,53 @@ private static class ApplierThread extends Thread @Override public void run() { + Set refreshableSet = Collections.newSetFromMap( new IdentityHashMap<>() ); + List latches = new ArrayList<>(); + FulltextIndexUpdate update; while ( (update = getNextUpdate()) != STOP_SIGNAL ) { - Pair updateProgress = applyUpdate( update ); - refreshIndex( updateProgress.first() ); - updateProgress.other().release(); + update = drainQueueAndApplyUpdates( update, refreshableSet, latches ); + refreshAndClearIndexes( refreshableSet ); + releaseAndClearLatches( latches ); + + if ( update == STOP_SIGNAL ) + { + return; + } + } + } + + private FulltextIndexUpdate drainQueueAndApplyUpdates( + FulltextIndexUpdate update, + Set refreshableSet, + List latches ) + { + do + { + applyUpdate( update, refreshableSet, latches ); + update = workQueue.poll(); + } + while ( update != null && update != STOP_SIGNAL ); + return update; + } + + private void refreshAndClearIndexes( Set refreshableSet ) + { + for ( WritableFulltext index : refreshableSet ) + { + refreshIndex( index ); + } + refreshableSet.clear(); + } + + private void releaseAndClearLatches( List latches ) + { + for ( BinaryLatch latch : latches ) + { + latch.release(); } + latches.clear(); } private FulltextIndexUpdate getNextUpdate() @@ -260,17 +304,20 @@ private FulltextIndexUpdate getNextUpdate() return update; } - private Pair applyUpdate( FulltextIndexUpdate update ) + private void applyUpdate( FulltextIndexUpdate update, + Set refreshableSet, + List latches ) { try { - return update.applyUpdateAndReturnIndex(); + Pair updateProgress = update.applyUpdateAndReturnIndex(); + refreshableSet.add( updateProgress.first() ); + latches.add( updateProgress.other() ); } catch ( IOException e ) { log.error( "Failed to apply fulltext index update.", e ); } - return null; } private void refreshIndex( WritableFulltext index ) diff --git a/enterprise/fulltext-addon/src/main/java/org/neo4j/kernel/api/impl/fulltext/WritableFulltext.java b/enterprise/fulltext-addon/src/main/java/org/neo4j/kernel/api/impl/fulltext/WritableFulltext.java index 1d2b16be58e0c..e2d2ae36e28df 100644 --- a/enterprise/fulltext-addon/src/main/java/org/neo4j/kernel/api/impl/fulltext/WritableFulltext.java +++ b/enterprise/fulltext-addon/src/main/java/org/neo4j/kernel/api/impl/fulltext/WritableFulltext.java @@ -27,17 +27,14 @@ class WritableFulltext extends WritableAbstractDatabaseIndex { - private final LuceneFulltext luceneFulltext; - WritableFulltext( LuceneFulltext luceneFulltext ) { super( luceneFulltext ); - this.luceneFulltext = luceneFulltext; } PartitionedIndexWriter getIndexWriter() throws IOException { - return luceneFulltext.getIndexWriter( this ); + return luceneIndex.getIndexWriter( this ); } Set properties() diff --git a/enterprise/fulltext-addon/src/test/java/org/neo4j/kernel/api/impl/fulltext/integrations/bloom/BloomIT.java b/enterprise/fulltext-addon/src/test/java/org/neo4j/kernel/api/impl/fulltext/integrations/bloom/BloomIT.java index 0e0225bda18b9..4432e26680b39 100644 --- a/enterprise/fulltext-addon/src/test/java/org/neo4j/kernel/api/impl/fulltext/integrations/bloom/BloomIT.java +++ b/enterprise/fulltext-addon/src/test/java/org/neo4j/kernel/api/impl/fulltext/integrations/bloom/BloomIT.java @@ -21,6 +21,7 @@ import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -28,6 +29,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Node; @@ -315,6 +317,71 @@ public void updatesAreAvailableToConcurrentReadTransactions() throws Exception } } + @Ignore( "This is more of a rudimentary write-throughput benchmark, which we can delete or turn into a proper " + + "benchmark at a later date") + @Test + public void fiveHundredThousandUpdates() throws Exception + { + GraphDatabaseBuilder builder = factory.newEmbeddedDatabaseBuilder( testDirectory.graphDbDir() ); + builder.setConfig( LoadableBloomFulltextConfig.bloom_indexed_properties, "prop" ); + db = builder.newGraphDatabase(); + + int trials = 50; + int threadCount = 10; + int updatesPerThread = 1000; + int updatesPerIteration = 10; + int iterationsPerThread = updatesPerThread / updatesPerIteration; + String[] words = + ("dui nunc mattis enim ut tellus elementum sagittis vitae et leo duis ut diam quam nulla porttitor " + + "massa id neque aliquam vestibulum morbi blandit cursus risus at ultrices mi tempus imperdiet nulla " + + "malesuada pellentesque elit eget gravida cum sociis natoque penatibus et magnis dis parturient " + + "montes nascetur ridiculus mus mauris").split( " " ); + + Runnable work = () -> + { + ThreadLocalRandom rng = ThreadLocalRandom.current(); + StringBuilder sb = new StringBuilder(); + for ( int i = 0; i < iterationsPerThread; i++ ) + { + try ( Transaction tx = db.beginTx() ) + { + for ( int j = 0; j < updatesPerIteration; j++ ) + { + sb.setLength( 0 ); + int wordCount = rng.nextInt( 3, 7 ); + for ( int k = 0; k < wordCount; k++ ) + { + sb.append( words[rng.nextInt( words.length )] ).append( ' ' ); + } + sb.setLength( sb.length() - 1 ); + db.createNode().setProperty( "prop", sb.toString() ); + } + tx.success(); + } + } + }; + + for ( int i = 0; i < trials; i++ ) + { + long startMillis = System.currentTimeMillis(); + Thread[] threads = new Thread[threadCount]; + for ( int j = 0; j < threadCount; j++ ) + { + threads[j] = new Thread( work ); + } + for ( Thread thread : threads ) + { + thread.start(); + } + for ( Thread thread : threads ) + { + thread.join(); + } + long elapsedMillis = System.currentTimeMillis() - startMillis; + System.out.printf( "elapsed: %s ms.%n", elapsedMillis ); + } + } + @Test public void shouldNotBeAbleToStartWithoutConfiguringProperties() throws Exception {