Skip to content

Commit

Permalink
Broke out combined seeker and unit tests its functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Oct 10, 2018
1 parent 7008cf7 commit 000b63e
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 83 deletions.
7 changes: 7 additions & 0 deletions community/kernel/pom.xml
Expand Up @@ -291,5 +291,12 @@ the relevant Commercial Agreement.
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-index</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,133 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.IOException;
import java.util.List;

import org.neo4j.cursor.RawCursor;
import org.neo4j.index.internal.gbptree.GBPTree;
import org.neo4j.index.internal.gbptree.Hit;
import org.neo4j.index.internal.gbptree.Layout;

import static org.neo4j.io.IOUtils.closeAll;

/**
* Combines multiple {@link GBPTree} seekers into one seeker, keeping the total order among all keys.
*
* @param <KEY> type of key
* @param <VALUE> type of value
*/
class CombinedPartSeeker<KEY,VALUE> implements RawCursor<Hit<KEY,VALUE>,IOException>, Hit<KEY,VALUE>
{
private final KEY end;
private final RawCursor<Hit<KEY,VALUE>,IOException>[] partCursors;
private final Object[] partHeads;
private final Layout<KEY,VALUE> layout;
private KEY nextKey;
private VALUE nextValue;

CombinedPartSeeker( Layout<KEY,VALUE> layout, List<RawCursor<Hit<KEY,VALUE>,IOException>> parts )
{
this.layout = layout;
int length = parts.size();
this.end = layout.newKey();
this.partCursors = parts.toArray( new RawCursor[0] );
this.partHeads = new Object[length];
}

@Override
public boolean next() throws IOException
{
// Pick lowest among all candidates
int nextKeyIndex = -1;
for ( int i = 0; i < partCursors.length; i++ )
{
// Get candidate from already seen heads, if any
KEY candidate = (KEY) partHeads[i];
if ( candidate == end )
{
continue;
}

// Get candidate from seeker, if available
if ( candidate == null )
{
if ( partCursors[i].next() )
{
partHeads[i] = candidate = partCursors[i].get().key();
}
else
{
partHeads[i] = end;
}
}

// Was our candidate lower than lowest we've seen so far this round?
if ( candidate != null )
{
if ( nextKeyIndex == -1 || layout.compare( candidate, nextKey ) < 0 )
{
nextKey = candidate;
nextKeyIndex = i;
}
}
}

if ( nextKeyIndex != -1 )
{
// We have a next key/value
nextValue = partCursors[nextKeyIndex].get().value();
partHeads[nextKeyIndex] = null;
return true;
}

// We've reached the end of all parts
nextKey = null;
nextValue = null;
return false;
}

@Override
public void close() throws IOException
{
closeAll( partCursors );
}

@Override
public Hit<KEY,VALUE> get()
{
return this;
}

@Override
public KEY key()
{
assert nextKey != null;
return nextKey;
}

@Override
public VALUE value()
{
assert nextValue != null;
return nextValue;
}
}
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
Expand All @@ -32,14 +33,15 @@
import org.neo4j.cursor.RawCursor;
import org.neo4j.index.internal.gbptree.Hit;
import org.neo4j.index.internal.gbptree.Writer;
import org.neo4j.internal.kernel.api.IndexOrder;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.NodePropertyAccessor;
import org.neo4j.storageengine.api.schema.IndexSample;

import static org.neo4j.io.IOUtils.closeAll;

/**
* Takes a somewhat high-level approach to parallelizing index population. It could be done lower level and more efficiently,
* but this may be good enough. Basically since multiple threads comes in and call add w/ batches of updates, each thread
Expand Down Expand Up @@ -173,88 +175,6 @@ public void close( boolean populationCompletedSuccessfully )
}
}

private void mergeParts() throws IOException, IndexEntryConflictException
{
KEY low = layout.newKey();
low.initialize( Long.MIN_VALUE );
low.initValuesAsLowest();
KEY high = layout.newKey();
high.initialize( Long.MAX_VALUE );
high.initValuesAsHighest();
KEY end = layout.newKey();
NativeIndexReader<KEY,VALUE>[] partReaders = new NativeIndexReader[partPopulators.size()];
RawCursor<Hit<KEY,VALUE>,IOException>[] partCursors = new RawCursor[partPopulators.size()];
Object[] partHeads = new Object[partPopulators.size()];
int ended = 0;
for ( int i = 0; i < partPopulators.size(); i++ )
{
ThreadLocalPopulator tlPopulator = partPopulators.get( i );
// Apply pending updates in this populator thread
tlPopulator.applyQueuedUpdates();
NativeIndexReader<KEY,VALUE> reader = tlPopulator.populator.newReader();
partReaders[i] = reader;
partCursors[i] = reader.makeIndexSeeker( low, high, IndexOrder.ASCENDING );
}

try ( Writer<KEY,VALUE> writer = completePopulator.tree.writer() )
{
// An idea how to parallelize the below loop:
// - Have one thread running ahead, making comparisons and leaving a trail of candidateIndexes behind it.
// - The thread doing the merge gets batches of candidate indexes and picks and writes w/o comparing

// As long there's stuff left to merge
while ( ended < partCursors.length )
{
// Pick lowest among all candidates
KEY lowestCandidate = null;
int lowestCandidateIndex = -1;
for ( int i = 0; i < partCursors.length; i++ )
{
KEY candidate = (KEY) partHeads[i];
if ( candidate == end )
{
continue;
}

if ( candidate == null )
{
if ( partCursors[i].next() )
{
partHeads[i] = candidate = partCursors[i].get().key();
}
else
{
partHeads[i] = end;
ended++;
}
}
if ( candidate != null )
{
if ( lowestCandidate == null || layout.compare( candidate, lowestCandidate ) < 0 )
{
lowestCandidate = candidate;
lowestCandidateIndex = i;
}
}
}

if ( lowestCandidate != null )
{
// Oh, we have something to insert
writer.put( lowestCandidate, partCursors[lowestCandidateIndex].get().value() );
partHeads[lowestCandidateIndex] = null;
}
}
}
finally
{
for ( NativeIndexReader<KEY,VALUE> partReader : partReaders )
{
partReader.close();
}
}
}

@Override
public void markAsFailed( String failure )
{
Expand Down Expand Up @@ -289,6 +209,7 @@ private void ensureMerged()
merged = true;
try
{
applyAllPendingUpdates();
mergeParts();
}
catch ( IOException e )
Expand All @@ -302,6 +223,59 @@ private void ensureMerged()
}
}

private void mergeParts() throws IOException
{
KEY from = layout.newKey();
KEY to = layout.newKey();
initKeysAsLowestAndHighest( from, to );
try ( Writer<KEY,VALUE> writer = completePopulator.tree.writer();
CombinedPartSeeker<KEY,VALUE> combinedPartSeeker = new CombinedPartSeeker<>( layout, partSeekers( from, to ) ) )
{
while ( combinedPartSeeker.next() )
{
writer.put( combinedPartSeeker.key(), combinedPartSeeker.value() );
}
}
}

private List<RawCursor<Hit<KEY,VALUE>,IOException>> partSeekers( KEY from, KEY to ) throws IOException
{
List<RawCursor<Hit<KEY,VALUE>,IOException>> seekers = new ArrayList<>();
boolean success = false;
try
{
for ( ThreadLocalPopulator partPopulator : partPopulators )
{
seekers.add( partPopulator.populator.tree.seek( from, to ) );
}
success = true;
return seekers;
}
finally
{
if ( !success )
{
closeAll( seekers );
}
}
}

private void initKeysAsLowestAndHighest( KEY low, KEY high )
{
low.initialize( Long.MIN_VALUE );
low.initValuesAsLowest();
high.initialize( Long.MAX_VALUE );
high.initValuesAsHighest();
}

private void applyAllPendingUpdates() throws IndexEntryConflictException
{
for ( ThreadLocalPopulator part : partPopulators )
{
part.applyQueuedUpdates();
}
}

/**
* A thread-local NativeIndexPopulator with a queue of batched external updates. We keep these per thread because we
* don't want the index populator main thread to apply updates to all the parts.
Expand Down

0 comments on commit 000b63e

Please sign in to comment.