Skip to content

Commit

Permalink
IndexUpdater in the native index accessor, part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
burqen authored and tinwelint committed Jun 26, 2017
1 parent c156753 commit a995260
Show file tree
Hide file tree
Showing 12 changed files with 526 additions and 95 deletions.
Expand Up @@ -63,6 +63,9 @@ public void add( CleanupJob job )
}
}

/**
* {@link RecoveryCleanupWorkCollector} ignoring all {@link CleanupJob} added to it.
*/
class NullRecoveryCleanupWorkCollector extends LifecycleAdapter implements RecoveryCleanupWorkCollector
{
@Override
Expand Down
Expand Up @@ -23,6 +23,7 @@
* Decides what to do when inserting key which already exists in index. Different implementations of
* {@link ValueMerger} can result in unique/non-unique indexes for example.
*
* @param <KEY> type of keys to merge.
* @param <VALUE> type of values to merge.
*/
public interface ValueMerger<KEY,VALUE>
Expand Down
Expand Up @@ -21,24 +21,38 @@

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;

import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.BoundedIterable;
import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.index.internal.gbptree.RecoveryCleanupWorkCollector;
import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.exceptions.index.IndexNotApplicableKernelException;
import org.neo4j.kernel.api.index.IndexAccessor;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.schema.IndexQuery;
import org.neo4j.kernel.impl.api.index.IndexUpdateMode;
import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.storageengine.api.schema.IndexSampler;

public class NativeSchemaNumberIndexAccessor implements IndexAccessor
public class NativeSchemaNumberIndexAccessor<KEY extends NumberKey, VALUE extends NumberValue>
extends NativeSchemaNumberIndex<KEY,VALUE> implements IndexAccessor
{
private final NativeSchemaNumberIndexUpdater<KEY,VALUE> singleUpdater;

NativeSchemaNumberIndexAccessor( PageCache pageCache, File storeFile,
Layout<KEY,VALUE> layout, RecoveryCleanupWorkCollector recoveryCleanupWorkCollector ) throws IOException
{
super( pageCache, storeFile, layout );
singleUpdater = new NativeSchemaNumberIndexUpdater<>( layout.newKey(), layout.newValue() );
instantiateTree( recoveryCleanupWorkCollector );
}

@Override
public void drop() throws IOException
{
Expand All @@ -48,25 +62,34 @@ public void drop() throws IOException
@Override
public IndexUpdater newUpdater( IndexUpdateMode mode )
{
return new NativeSchemaNumberIndexUpdater();
try
{
return singleUpdater.initialize( tree.writer(), true );
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}

@Override
public void flush() throws IOException
{
// todo remove this from interface completely
throw new UnsupportedOperationException( "Implement me" );
}

@Override
public void force() throws IOException
{
throw new UnsupportedOperationException( "Implement me" );
// TODO add IOLimiter arg
tree.checkpoint( IOLimiter.unlimited() );
}

@Override
public void close() throws IOException
{
throw new UnsupportedOperationException( "Implement me" );
closeTree();
}

@Override
Expand Down Expand Up @@ -94,31 +117,8 @@ public void verifyDeferredConstraints( PropertyAccessor propertyAccessor )
throw new UnsupportedOperationException( "Implement me" );
}

private class NativeSchemaNumberIndexUpdater implements IndexUpdater
{

@Override
public void process( IndexEntryUpdate update ) throws IOException, IndexEntryConflictException
{
throw new UnsupportedOperationException( "Implement me" );
}

@Override
public void close() throws IOException, IndexEntryConflictException
{
throw new UnsupportedOperationException( "Implement me" );
}

@Override
public void remove( PrimitiveLongSet nodeIds ) throws IOException
{
throw new UnsupportedOperationException( "Implement me" );
}
}

private class NativeSchemaNumberIndexReader implements IndexReader
{

@Override
public void close()
{
Expand Down
Expand Up @@ -72,7 +72,8 @@ public void process( IndexEntryUpdate update ) throws IOException, IndexEntryCon
processChange( treeKey, treeValue, update, writer, conflictDetectingValueMerger );
break;
case REMOVED:
throw new UnsupportedOperationException( "Implement me" );
processRemove( treeKey, update, writer );
break;
default:
throw new IllegalArgumentException();
}
Expand Down Expand Up @@ -102,29 +103,38 @@ private void assertOpen()
}
}

private static <KEY extends NumberKey, VALUE extends NumberValue> void processRemove( KEY treeKey,
IndexEntryUpdate update, Writer<KEY,VALUE> writer ) throws IOException
{
// todo Do we need to verify that we actually removed something at all?
// todo Difference between online and recovery?
treeKey.from( update.getEntityId(), update.values() );
writer.remove( treeKey );
}

private static <KEY extends NumberKey, VALUE extends NumberValue> void processChange( KEY treeKey, VALUE treeValue,
IndexEntryUpdate update, Writer<KEY,VALUE> singleWriter,
IndexEntryUpdate update, Writer<KEY,VALUE> writer,
ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger )
throws IOException, IndexEntryConflictException
{
// Remove old entry
treeKey.from( update.getEntityId(), update.beforeValues() );
singleWriter.remove( treeKey );
// Insert new entrty
writer.remove( treeKey );
// Insert new entry
treeKey.from( update.getEntityId(), update.values() );
treeValue.from( update.values() );
singleWriter.merge( treeKey, treeValue, conflictDetectingValueMerger );
writer.merge( treeKey, treeValue, conflictDetectingValueMerger );
assertNoConflict( update, conflictDetectingValueMerger );
}

static <KEY extends NumberKey, VALUE extends NumberValue> void processAdd( KEY treeKey, VALUE treeValue,
IndexEntryUpdate update, Writer<KEY,VALUE> singleWriter,
IndexEntryUpdate update, Writer<KEY,VALUE> writer,
ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger )
throws IOException, IndexEntryConflictException
{
treeKey.from( update.getEntityId(), update.values() );
treeValue.from( update.values() );
singleWriter.merge( treeKey, treeValue, conflictDetectingValueMerger );
writer.merge( treeKey, treeValue, conflictDetectingValueMerger );
assertNoConflict( update, conflictDetectingValueMerger );
}

Expand Down
Expand Up @@ -22,6 +22,8 @@
import org.neo4j.index.internal.gbptree.GBPTree;
import org.neo4j.values.Value;

import static org.neo4j.kernel.impl.index.schema.NumberValueConversion.toValue;

/**
* Value in a {@link GBPTree} handling numbers suitable for schema indexing.
* Contains actual number for internal filtering after accidental query hits due to double value coersion.
Expand Down Expand Up @@ -72,4 +74,10 @@ else if ( value instanceof Float )
rawValueBits = value.longValue();
}
}

@Override
public String toString()
{
return "type=" + type + ",rawValue=" + rawValueBits + ",value=" + toValue( type, rawValueBits );
}
}
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

public class NativeNonUniqueSchemaNumberIndexAccessorTest
extends NativeSchemaNumberIndexAccessorTest<NumberKey,NumberValue>
{
@Override
protected LayoutTestUtil<NumberKey,NumberValue> createLayoutTestUtil()
{
return new NonUniqueLayoutTestUtil();
}
}
Expand Up @@ -94,15 +94,15 @@ public void shouldApplyLargeAmountOfInterleavedRandomUpdatesWithDuplicates() thr
populator.create();
random.reset();
Random updaterRandom = new Random( random.seed() );
Iterator<IndexEntryUpdate<IndexDescriptor>> updates = randomUniqueUpdateGenerator( random, 0.1f );
Iterator<IndexEntryUpdate<IndexDescriptor>> updates = randomUniqueUpdateGenerator( 0.1f );

// when
int count = interleaveLargeAmountOfUpdates( updaterRandom, updates );

// then
populator.close( true );
random.reset();
verifyUpdates( randomUniqueUpdateGenerator( random, 0.1f ), count );
verifyUpdates( randomUniqueUpdateGenerator( 0.1f ), count );
}

@Test
Expand Down
Expand Up @@ -26,17 +26,13 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;

import org.neo4j.helpers.collection.PrefetchingIterator;
import org.neo4j.index.internal.gbptree.GBPTree;
import org.neo4j.index.internal.gbptree.Header;
import org.neo4j.index.internal.gbptree.Layout;
Expand All @@ -51,8 +47,6 @@
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.test.rule.RandomRule;
import org.neo4j.values.Values;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -376,15 +370,15 @@ public void shouldApplyLargeAmountOfInterleavedRandomUpdates() throws Exception
populator.create();
random.reset();
Random updaterRandom = new Random( random.seed() );
Iterator<IndexEntryUpdate<IndexDescriptor>> updates = randomUniqueUpdateGenerator( random, 0 );
Iterator<IndexEntryUpdate<IndexDescriptor>> updates = randomUniqueUpdateGenerator( 0 );

// when
int count = interleaveLargeAmountOfUpdates( updaterRandom, updates );

// then
populator.close( true );
random.reset();
verifyUpdates( randomUniqueUpdateGenerator( random, 0 ), count );
verifyUpdates( randomUniqueUpdateGenerator( 0 ), count );
}

@Test
Expand Down Expand Up @@ -524,53 +518,6 @@ int interleaveLargeAmountOfUpdates( Random updaterRandom,
return count;
}

Iterator<IndexEntryUpdate<IndexDescriptor>> randomUniqueUpdateGenerator( RandomRule randomRule,
float fractionDuplicates )
{
return new PrefetchingIterator<IndexEntryUpdate<IndexDescriptor>>()
{
private final Set<Double> uniqueCompareValues = new HashSet<>();
private final List<Number> uniqueValues = new ArrayList<>();
private long currentEntityId;

@Override
protected IndexEntryUpdate<IndexDescriptor> fetchNextOrNull()
{
Number value;
if ( fractionDuplicates > 0 && !uniqueValues.isEmpty() &&
randomRule.nextFloat() < fractionDuplicates )
{
value = existingNonUniqueValue( randomRule );
}
else
{
value = newUniqueValue( randomRule );
}

return add( currentEntityId++, value );
}

private Number newUniqueValue( RandomRule randomRule )
{
Number value;
Double compareValue;
do
{
value = randomRule.numberPropertyValue();
compareValue = value.doubleValue();
}
while ( !uniqueCompareValues.add( compareValue ) );
uniqueValues.add( value );
return value;
}

private Number existingNonUniqueValue( RandomRule randomRule )
{
return uniqueValues.get( randomRule.nextInt( uniqueValues.size() ) );
}
};
}

private void assertHeader( boolean online, String failureMessage, boolean messageTruncated ) throws IOException
{
NativeSchemaIndexHeaderReader headerReader = new NativeSchemaIndexHeaderReader();
Expand Down

0 comments on commit a995260

Please sign in to comment.