Skip to content

Commit

Permalink
Split insert in IndexWriter into merge and put
Browse files Browse the repository at this point in the history
Taking inspiration from Map#merge.
Removed ValueAmender (now ValueMerger) that could insert non unique keys, because this is not supported by the tree.
  • Loading branch information
burqen committed Nov 22, 2016
1 parent 2ed9985 commit cb986a9
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 96 deletions.
38 changes: 17 additions & 21 deletions community/index/src/main/java/org/neo4j/index/IndexWriter.java
Expand Up @@ -22,43 +22,39 @@
import java.io.Closeable;
import java.io.IOException;

import static org.neo4j.index.ValueAmenders.overwrite;

/**
* Able to {@link #insert(Object, Object, ValueAmender)} and {@link #remove(Object)} key/value pairs
* Able to {@link #merge(Object, Object, ValueMerger)} and {@link #remove(Object)} key/value pairs
* into an {@link Index}. After all modifications have taken place the writer must be {@link #close() closed},
* typically using try-with-resource clause.
*
* @param <KEY> type of keys to insert/remove
* @param <VALUE> type of values to insert/removed
* @param <KEY> type of keys
* @param <VALUE> type of values
*/
public interface IndexWriter<KEY,VALUE> extends Closeable
{
/**
* Defaults to {@link ValueAmenders#overwrite() overwriting values} for existing key.
* Associate given {@code key} with given {@code value}.
* Any existing {@code value} associated with {@code key} will be overwritten.
*
* @param key key to insert.
* @param value value to insert for the {@code key}.
* @param key key to associate with value
* @param value value to associate with key
* @throws IOException on index access error.
*
* @see #insert(Object, Object, ValueAmender)
*/
default void insert( KEY key, VALUE value ) throws IOException
{
insert( key, value, overwrite() );
}
void put( KEY key, VALUE value ) throws IOException;

/**
* Inserts a key/value pair. In the event where {@code key} already exists the {@link ValueAmender}
* gets consulted, which can choose e.g. to insert a new key/value pair, overwrite or somehow modify
* the existing value.
* If the {@code key} doesn't already exist in the index the {@code key} will be added and the {@code value}
* associated with it. If the {@code key} already exists then its existing {@code value} will be merged with
* the given {@code value}, using the {@link ValueMerger}. If the {@link ValueMerger} returns a non-null
* value that value will be associated with the {@code key}, otherwise (if it returns {@code null}) nothing will
* be written.
*
* @param key key to insert.
* @param value value to insert for the {@code key}.
* @param amender {@link ValueAmender} to consult if key already exists.
* @param key key for which to merge values.
* @param value value to merge with currently associated value for the {@code key}.
* @param valueMerger {@link ValueMerger} to consult if key already exists.
* @throws IOException on index access error.
*/
void insert( KEY key, VALUE value, ValueAmender<VALUE> amender ) throws IOException;
void merge( KEY key, VALUE value, ValueMerger<VALUE> valueMerger ) throws IOException;

/**
* Removes a key, returning it's associated value, if found.
Expand Down
Expand Up @@ -21,19 +21,19 @@

/**
* Decides what to do when inserting key which already exists in index. Different implementations of
* {@link ValueAmender} can result in unique/non-unique indexes for example.
* {@link ValueMerger} can result in unique/non-unique indexes for example.
*
* @param <VALUE> type of values to amend.
* @param <VALUE> type of values to merge.
*/
public interface ValueAmender<VALUE>
public interface ValueMerger<VALUE>
{
/**
* Amends an existing value with a new value, returning potentially a combination of the two, or {@code null}
* if no amend was done effectively meaning that a new value should be inserted for that same key.
* Merge an existing value with a new value, returning potentially a combination of the two, or {@code null}
* if no merge was done effectively meaning that nothing should be written.
*
* @param value existing value
* @param withValue new value
* @return {@code value}, now amended with {@code withValue}, or {@code null} if no amend was done.
* @return {@code value}, now merged with {@code withValue}, or {@code null} if no merge was done.
*/
VALUE amend( VALUE value, VALUE withValue );
VALUE merge( VALUE value, VALUE withValue );
}
Expand Up @@ -20,47 +20,47 @@
package org.neo4j.index;

/**
* Common {@link ValueAmender} implementations.
* Common {@link ValueMerger} implementations.
*/
public class ValueAmenders
public class ValueMergers
{
@SuppressWarnings( "rawtypes" )
private static final ValueAmender OVERWRITE = new ValueAmender()
private static final ValueMerger OVERWRITE = new ValueMerger()
{
@Override
public Object amend( Object value, Object withValue )
public Object merge( Object value, Object withValue )
{
return withValue;
}
};

@SuppressWarnings( "rawtypes" )
private static final ValueAmender INSERT_NEW = new ValueAmender()
private static final ValueMerger KEEP_EXISTING = new ValueMerger()
{
@Override
public Object amend( Object value, Object withValue )
public Object merge( Object value, Object withValue )
{
return null;
}
};

/**
* @return {@link ValueAmender} which overwrites value for existing key when inserting.
* This makes an index have unique keys.
* @return {@link ValueMerger} which overwrites value for existing key when inserting.
* This merger guarantees unique keys in index.
*/
@SuppressWarnings( "unchecked" )
public static <VALUE> ValueAmender<VALUE> overwrite()
public static <VALUE> ValueMerger<VALUE> overwrite()
{
return OVERWRITE;
}

/**
* @return {@link ValueAmender} which inserts new key/value even for existing keys.
* This makes an index have non-unique keys.
* @return {@link ValueMerger} which keeps existing key/value otherwise adds new key/value pair.
* This merger guarantees unique keys in index.
*/
@SuppressWarnings( "unchecked" )
public static <VALUE> ValueAmender<VALUE> insertNew()
public static <VALUE> ValueMerger<VALUE> keepExisting()
{
return INSERT_NEW;
return KEEP_EXISTING;
}
}
13 changes: 10 additions & 3 deletions community/index/src/main/java/org/neo4j/index/gbptree/GBPTree.java
Expand Up @@ -29,7 +29,8 @@
import org.neo4j.index.Hit;
import org.neo4j.index.Index;
import org.neo4j.index.IndexWriter;
import org.neo4j.index.ValueAmender;
import org.neo4j.index.ValueMerger;
import org.neo4j.index.ValueMergers;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;
Expand Down Expand Up @@ -420,11 +421,17 @@ SingleIndexWriter take( long rootId, IndexWriter.Options options ) throws IOExce
}

@Override
public void insert( KEY key, VALUE value, ValueAmender<VALUE> amender ) throws IOException
public void put( KEY key, VALUE value ) throws IOException
{
merge( key, value, ValueMergers.overwrite() );
}

@Override
public void merge( KEY key, VALUE value, ValueMerger<VALUE> valueMerger ) throws IOException
{
cursor.next( rootId );

SplitResult<KEY> split = treeLogic.insert( cursor, key, value, amender, options,
SplitResult<KEY> split = treeLogic.insert( cursor, key, value, valueMerger, options,
stableGeneration, unstableGeneration );

if ( cursor.checkAndClearBoundsFlag() )
Expand Down
Expand Up @@ -22,7 +22,7 @@
import java.io.IOException;

import org.neo4j.index.IndexWriter;
import org.neo4j.index.ValueAmender;
import org.neo4j.index.ValueMerger;
import org.neo4j.io.pagecache.PageCursor;

import static java.lang.Integer.max;
Expand Down Expand Up @@ -108,19 +108,19 @@ class InternalTreeLogic<KEY,VALUE>
* @param cursor {@link org.neo4j.io.pagecache.PageCursor} pinned to page where insertion is to be done.
* @param key key to be inserted
* @param value value to be associated with key
* @param amender {@link ValueAmender} for deciding what to do with existing keys
* @param valueMerger {@link ValueMerger} for deciding what to do with existing keys
* @param options options for this insert
* @param stableGeneration stable generation, i.e. generations <= this generation are considered stable.
* @param unstableGeneration unstable generation, i.e. generation which is under development right now.
* @return {@link SplitResult} from insert to be used caller.
* @throws IOException on cursor failure
*/
public SplitResult<KEY> insert( PageCursor cursor, KEY key, VALUE value, ValueAmender<VALUE> amender,
public SplitResult<KEY> insert( PageCursor cursor, KEY key, VALUE value, ValueMerger<VALUE> valueMerger,
IndexWriter.Options options, int stableGeneration, int unstableGeneration ) throws IOException
{
if ( bTreeNode.isLeaf( cursor ) )
{
return insertInLeaf( cursor, key, value, amender, options, stableGeneration, unstableGeneration );
return insertInLeaf( cursor, key, value, valueMerger, options, stableGeneration, unstableGeneration );
}

int keyCount = bTreeNode.keyCount( cursor );
Expand All @@ -137,7 +137,8 @@ public SplitResult<KEY> insert( PageCursor cursor, KEY key, VALUE value, ValueAm

cursor.next( childId );

SplitResult<KEY> split = insert( cursor, key, value, amender, options, stableGeneration, unstableGeneration );
SplitResult<KEY> split = insert( cursor, key, value, valueMerger, options, stableGeneration,
unstableGeneration );

cursor.next( currentId );

Expand Down Expand Up @@ -293,29 +294,28 @@ private static int middle( int keyCountAfterInsert, float splitLeftChildSize )
* insertion.
* @param key key to be inserted
* @param value value to be associated with key
* @param amender {@link ValueAmender} for deciding what to do with existing keys
* @param valueMerger {@link ValueMerger} for deciding what to do with existing keys
* @param options options for this insert
* @return {@link SplitResult} from insert to be used caller.
* @throws IOException on cursor failure
*/
private SplitResult<KEY> insertInLeaf( PageCursor cursor, KEY key, VALUE value, ValueAmender<VALUE> amender,
private SplitResult<KEY> insertInLeaf( PageCursor cursor, KEY key, VALUE value, ValueMerger<VALUE> valueMerger,
IndexWriter.Options options, int stableGeneration, int unstableGeneration ) throws IOException
{
int keyCount = bTreeNode.keyCount( cursor );
int search = search( cursor, bTreeNode, key, readKey, keyCount );
int pos = positionOf( search );
if ( isHit( search ) )
{
// this key already exists, what shall we do? ask the amender
// this key already exists, what shall we do? ask the valueMerger
bTreeNode.valueAt( cursor, readValue, pos );
VALUE amendedValue = amender.amend( readValue, value );
if ( amendedValue != null )
VALUE mergedValue = valueMerger.merge( readValue, value );
if ( mergedValue != null )
{
// simple, just write the amended value right in there
bTreeNode.setValueAt( cursor, amendedValue, pos );
return null; // No split has occurred
// simple, just write the merged value right in there
bTreeNode.setValueAt( cursor, mergedValue, pos );
}
// else fall-through to normal insert
return null; // No split has occurred
}

if ( keyCount < bTreeNode.leafMaxKeyCount() )
Expand All @@ -329,7 +329,7 @@ private SplitResult<KEY> insertInLeaf( PageCursor cursor, KEY key, VALUE value,
}

// Overflow, split leaf
return splitLeaf( cursor, key, value, amender, keyCount, options, stableGeneration, unstableGeneration );
return splitLeaf( cursor, key, value, keyCount, options, stableGeneration, unstableGeneration );
}

/**
Expand All @@ -338,14 +338,13 @@ private SplitResult<KEY> insertInLeaf( PageCursor cursor, KEY key, VALUE value,
* @param cursor cursor pointing into full (left) leaf that should be split in two.
* @param newKey key to be inserted
* @param newValue value to be inserted (in association with key)
* @param amender {@link ValueAmender} for deciding what to do with existing keys
* @param keyCount number of keys in this leaf (it was already read anyway)
* @param options options for this insert
* @return {@link SplitResult} with necessary information to inform parent
* @throws IOException if cursor.next( newRight ) fails
*/
private SplitResult<KEY> splitLeaf( PageCursor cursor, KEY newKey, VALUE newValue, ValueAmender<VALUE> amender,
int keyCount, IndexWriter.Options options, int stableGeneration, int unstableGeneration ) throws IOException
private SplitResult<KEY> splitLeaf( PageCursor cursor, KEY newKey, VALUE newValue, int keyCount,
IndexWriter.Options options, int stableGeneration, int unstableGeneration ) throws IOException
{
// To avoid moving cursor between pages we do all operations on left node first.
// Save data that needs transferring and then add it to right node.
Expand Down
Expand Up @@ -317,7 +317,7 @@ public void shouldSeeSimpleInsertions() throws Exception
{
for ( int i = 0; i < count; i++ )
{
writer.insert( new MutableLong( i ), new MutableLong( i ) );
writer.put( new MutableLong( i ), new MutableLong( i ) );
}
}

Expand Down Expand Up @@ -353,7 +353,7 @@ public void shouldStayCorrectAfterRandomModifications() throws Exception
{
for ( Map.Entry<MutableLong,MutableLong> entry : data.entrySet() )
{
writer.insert( entry.getKey(), entry.getValue() );
writer.put( entry.getKey(), entry.getValue() );
}
}

Expand Down Expand Up @@ -421,7 +421,7 @@ public void shouldSplitCorrectly() throws Exception
}
while ( !seen.add( key.longValue() ) );
MutableLong value = new MutableLong( i );
writer.insert( key, value );
writer.put( key, value );
seen.add( key.longValue() );
}
}
Expand Down Expand Up @@ -545,7 +545,7 @@ public void shouldReadCorrectlyWhenConcurrentlyInserting() throws Throwable
for ( int i = 0; i < groupCount; i++, inserted++ )
{
MutableLong thing = new MutableLong( inserted );
writer.insert( thing, thing );
writer.put( thing, thing );
highestId.set( inserted );
}
// Sleep a little in between update groups (transactions, sort of)
Expand Down Expand Up @@ -586,10 +586,10 @@ private void randomlyModifyIndex( Index<MutableLong,MutableLong> index,
assertEquals( "For " + key, value, removedValue );
}
else
{ // insert
{ // put
MutableLong key = randomKey( random );
MutableLong value = randomKey( random );
writer.insert( key, value );
writer.put( key, value );
data.put( key, value );
}
}
Expand Down

0 comments on commit cb986a9

Please sign in to comment.