From 000b63ea376f1ea145faf5a7b3f3defbec72756b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Finn=C3=A9?= Date: Thu, 4 Oct 2018 09:15:44 +0200 Subject: [PATCH] Broke out combined seeker and unit tests its functionality --- community/kernel/pom.xml | 7 + .../impl/index/schema/CombinedPartSeeker.java | 133 +++++++++++++++++ .../schema/ParallelNativeIndexPopulator.java | 140 +++++++----------- .../index/schema/CombinedPartSeekerTest.java | 125 ++++++++++++++++ 4 files changed, 322 insertions(+), 83 deletions(-) create mode 100644 community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/CombinedPartSeeker.java create mode 100644 community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/CombinedPartSeekerTest.java diff --git a/community/kernel/pom.xml b/community/kernel/pom.xml index b296a32f861fc..8fe4bf04ee577 100644 --- a/community/kernel/pom.xml +++ b/community/kernel/pom.xml @@ -291,5 +291,12 @@ the relevant Commercial Agreement. test-jar test + + org.neo4j + neo4j-index + ${project.version} + test-jar + test + diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/CombinedPartSeeker.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/CombinedPartSeeker.java new file mode 100644 index 0000000000000..2eea2bdbdf6b0 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/CombinedPartSeeker.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.index.schema; + +import java.io.IOException; +import java.util.List; + +import org.neo4j.cursor.RawCursor; +import org.neo4j.index.internal.gbptree.GBPTree; +import org.neo4j.index.internal.gbptree.Hit; +import org.neo4j.index.internal.gbptree.Layout; + +import static org.neo4j.io.IOUtils.closeAll; + +/** + * Combines multiple {@link GBPTree} seekers into one seeker, keeping the total order among all keys. + * + * @param type of key + * @param type of value + */ +class CombinedPartSeeker implements RawCursor,IOException>, Hit +{ + private final KEY end; + private final RawCursor,IOException>[] partCursors; + private final Object[] partHeads; + private final Layout layout; + private KEY nextKey; + private VALUE nextValue; + + CombinedPartSeeker( Layout layout, List,IOException>> parts ) + { + this.layout = layout; + int length = parts.size(); + this.end = layout.newKey(); + this.partCursors = parts.toArray( new RawCursor[0] ); + this.partHeads = new Object[length]; + } + + @Override + public boolean next() throws IOException + { + // Pick lowest among all candidates + int nextKeyIndex = -1; + for ( int i = 0; i < partCursors.length; i++ ) + { + // Get candidate from already seen heads, if any + KEY candidate = (KEY) partHeads[i]; + if ( candidate == end ) + { + continue; + } + + // Get candidate from seeker, if available + if ( candidate == null ) + { + if ( partCursors[i].next() ) + { + partHeads[i] = candidate = partCursors[i].get().key(); + } + else + { + partHeads[i] = end; + } + } + + // Was our candidate lower than lowest we've seen so far this round? + if ( candidate != null ) + { + if ( nextKeyIndex == -1 || layout.compare( candidate, nextKey ) < 0 ) + { + nextKey = candidate; + nextKeyIndex = i; + } + } + } + + if ( nextKeyIndex != -1 ) + { + // We have a next key/value + nextValue = partCursors[nextKeyIndex].get().value(); + partHeads[nextKeyIndex] = null; + return true; + } + + // We've reached the end of all parts + nextKey = null; + nextValue = null; + return false; + } + + @Override + public void close() throws IOException + { + closeAll( partCursors ); + } + + @Override + public Hit get() + { + return this; + } + + @Override + public KEY key() + { + assert nextKey != null; + return nextKey; + } + + @Override + public VALUE value() + { + assert nextValue != null; + return nextValue; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/ParallelNativeIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/ParallelNativeIndexPopulator.java index 0bca66a38d5e3..3954035ab65e2 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/ParallelNativeIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/ParallelNativeIndexPopulator.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Queue; @@ -32,7 +33,6 @@ import org.neo4j.cursor.RawCursor; import org.neo4j.index.internal.gbptree.Hit; import org.neo4j.index.internal.gbptree.Writer; -import org.neo4j.internal.kernel.api.IndexOrder; import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; import org.neo4j.kernel.api.index.IndexEntryUpdate; import org.neo4j.kernel.api.index.IndexPopulator; @@ -40,6 +40,8 @@ import org.neo4j.kernel.api.index.NodePropertyAccessor; import org.neo4j.storageengine.api.schema.IndexSample; +import static org.neo4j.io.IOUtils.closeAll; + /** * Takes a somewhat high-level approach to parallelizing index population. It could be done lower level and more efficiently, * but this may be good enough. Basically since multiple threads comes in and call add w/ batches of updates, each thread @@ -173,88 +175,6 @@ public void close( boolean populationCompletedSuccessfully ) } } - private void mergeParts() throws IOException, IndexEntryConflictException - { - KEY low = layout.newKey(); - low.initialize( Long.MIN_VALUE ); - low.initValuesAsLowest(); - KEY high = layout.newKey(); - high.initialize( Long.MAX_VALUE ); - high.initValuesAsHighest(); - KEY end = layout.newKey(); - NativeIndexReader[] partReaders = new NativeIndexReader[partPopulators.size()]; - RawCursor,IOException>[] partCursors = new RawCursor[partPopulators.size()]; - Object[] partHeads = new Object[partPopulators.size()]; - int ended = 0; - for ( int i = 0; i < partPopulators.size(); i++ ) - { - ThreadLocalPopulator tlPopulator = partPopulators.get( i ); - // Apply pending updates in this populator thread - tlPopulator.applyQueuedUpdates(); - NativeIndexReader reader = tlPopulator.populator.newReader(); - partReaders[i] = reader; - partCursors[i] = reader.makeIndexSeeker( low, high, IndexOrder.ASCENDING ); - } - - try ( Writer writer = completePopulator.tree.writer() ) - { - // An idea how to parallelize the below loop: - // - Have one thread running ahead, making comparisons and leaving a trail of candidateIndexes behind it. - // - The thread doing the merge gets batches of candidate indexes and picks and writes w/o comparing - - // As long there's stuff left to merge - while ( ended < partCursors.length ) - { - // Pick lowest among all candidates - KEY lowestCandidate = null; - int lowestCandidateIndex = -1; - for ( int i = 0; i < partCursors.length; i++ ) - { - KEY candidate = (KEY) partHeads[i]; - if ( candidate == end ) - { - continue; - } - - if ( candidate == null ) - { - if ( partCursors[i].next() ) - { - partHeads[i] = candidate = partCursors[i].get().key(); - } - else - { - partHeads[i] = end; - ended++; - } - } - if ( candidate != null ) - { - if ( lowestCandidate == null || layout.compare( candidate, lowestCandidate ) < 0 ) - { - lowestCandidate = candidate; - lowestCandidateIndex = i; - } - } - } - - if ( lowestCandidate != null ) - { - // Oh, we have something to insert - writer.put( lowestCandidate, partCursors[lowestCandidateIndex].get().value() ); - partHeads[lowestCandidateIndex] = null; - } - } - } - finally - { - for ( NativeIndexReader partReader : partReaders ) - { - partReader.close(); - } - } - } - @Override public void markAsFailed( String failure ) { @@ -289,6 +209,7 @@ private void ensureMerged() merged = true; try { + applyAllPendingUpdates(); mergeParts(); } catch ( IOException e ) @@ -302,6 +223,59 @@ private void ensureMerged() } } + private void mergeParts() throws IOException + { + KEY from = layout.newKey(); + KEY to = layout.newKey(); + initKeysAsLowestAndHighest( from, to ); + try ( Writer writer = completePopulator.tree.writer(); + CombinedPartSeeker combinedPartSeeker = new CombinedPartSeeker<>( layout, partSeekers( from, to ) ) ) + { + while ( combinedPartSeeker.next() ) + { + writer.put( combinedPartSeeker.key(), combinedPartSeeker.value() ); + } + } + } + + private List,IOException>> partSeekers( KEY from, KEY to ) throws IOException + { + List,IOException>> seekers = new ArrayList<>(); + boolean success = false; + try + { + for ( ThreadLocalPopulator partPopulator : partPopulators ) + { + seekers.add( partPopulator.populator.tree.seek( from, to ) ); + } + success = true; + return seekers; + } + finally + { + if ( !success ) + { + closeAll( seekers ); + } + } + } + + private void initKeysAsLowestAndHighest( KEY low, KEY high ) + { + low.initialize( Long.MIN_VALUE ); + low.initValuesAsLowest(); + high.initialize( Long.MAX_VALUE ); + high.initValuesAsHighest(); + } + + private void applyAllPendingUpdates() throws IndexEntryConflictException + { + for ( ThreadLocalPopulator part : partPopulators ) + { + part.applyQueuedUpdates(); + } + } + /** * A thread-local NativeIndexPopulator with a queue of batched external updates. We keep these per thread because we * don't want the index populator main thread to apply updates to all the parts. diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/CombinedPartSeekerTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/CombinedPartSeekerTest.java new file mode 100644 index 0000000000000..2193d72a02362 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/CombinedPartSeekerTest.java @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.index.schema; + +import org.apache.commons.lang3.mutable.MutableLong; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import org.neo4j.cursor.RawCursor; +import org.neo4j.index.internal.gbptree.Hit; +import org.neo4j.index.internal.gbptree.SimpleLongLayout; +import org.neo4j.test.extension.Inject; +import org.neo4j.test.extension.RandomExtension; +import org.neo4j.test.rule.RandomRule; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@ExtendWith( RandomExtension.class ) +class CombinedPartSeekerTest +{ + private static final Comparator> HIT_COMPARATOR = Comparator.comparing( Hit::key ); + + @Inject + RandomRule random; + + @Test + void shouldCombineAllParts() throws IOException + { + // given + SimpleLongLayout layout = new SimpleLongLayout( 0, "", true, 1, 2, 3 ); + List,IOException>> parts = new ArrayList<>(); + int partCount = random.nextInt( 1, 20 ); + List> expectedAllData = new ArrayList<>(); + int maxKey = random.nextInt( 100, 10_000 ); + for ( int i = 0; i < partCount; i++ ) + { + int dataSize = random.nextInt( 0, 100 ); + List> partData = new ArrayList<>( dataSize ); + for ( int j = 0; j < dataSize; j++ ) + { + long key = random.nextLong( maxKey ); + partData.add( new SimpleHit<>( new MutableLong( key ), new MutableLong( key * 2 ) ) ); + } + partData.sort( HIT_COMPARATOR ); + parts.add( new SimpleSeeker( partData ) ); + expectedAllData.addAll( partData ); + } + expectedAllData.sort( HIT_COMPARATOR ); + + // when + CombinedPartSeeker combinedSeeker = new CombinedPartSeeker<>( layout, parts ); + + // then + for ( Hit expectedHit : expectedAllData ) + { + assertTrue( combinedSeeker.next() ); + Hit actualHit = combinedSeeker.get(); + + assertEquals( expectedHit.key().longValue(), actualHit.key().longValue() ); + assertEquals( expectedHit.value().longValue(), actualHit.value().longValue() ); + } + assertFalse( combinedSeeker.next() ); + // And just ensure it will return false again after that + assertFalse( combinedSeeker.next() ); + } + + private static class SimpleSeeker implements RawCursor,IOException> + { + private final Iterator> data; + private Hit current; + + private SimpleSeeker( Iterable> data ) + { + this.data = data.iterator(); + } + + @Override + public boolean next() + { + if ( data.hasNext() ) + { + current = data.next(); + return true; + } + return false; + } + + @Override + public void close() + { + // Nothing to close + } + + @Override + public Hit get() + { + return current; + } + } +}