Skip to content

Commit

Permalink
Fusion layer instantiates parts lazily
Browse files Browse the repository at this point in the history
Instead of always instantiating eagerly. E.g. an IndexReader would previously
instantiate sub-readers for lucene, number, string, spatial and temporal
even if it would only need one of those. Likewise they would all be closed
afterwards. This proved to be an unnecessary overhead.
  • Loading branch information
tinwelint committed May 16, 2018
1 parent ade8a56 commit 88020d3
Show file tree
Hide file tree
Showing 21 changed files with 423 additions and 341 deletions.
Expand Up @@ -19,8 +19,6 @@
*/ */
package org.neo4j.kernel.impl.index.schema; package org.neo4j.kernel.impl.index.schema;


import java.io.IOException;

import org.neo4j.collection.primitive.PrimitiveLongResourceIterator; import org.neo4j.collection.primitive.PrimitiveLongResourceIterator;
import org.neo4j.graphdb.Resource; import org.neo4j.graphdb.Resource;
import org.neo4j.helpers.collection.Iterators; import org.neo4j.helpers.collection.Iterators;
Expand Down
Expand Up @@ -19,8 +19,6 @@
*/ */
package org.neo4j.kernel.impl.index.schema; package org.neo4j.kernel.impl.index.schema;


import java.io.IOException;

import org.neo4j.collection.primitive.PrimitiveLongResourceIterator; import org.neo4j.collection.primitive.PrimitiveLongResourceIterator;
import org.neo4j.graphdb.Resource; import org.neo4j.graphdb.Resource;
import org.neo4j.helpers.collection.Iterators; import org.neo4j.helpers.collection.Iterators;
Expand Down
Expand Up @@ -25,9 +25,10 @@
import org.neo4j.kernel.api.index.IndexEntryUpdate; import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.impl.api.index.IndexUpdateMode; import org.neo4j.kernel.impl.api.index.IndexUpdateMode;
import org.neo4j.kernel.impl.index.schema.fusion.FusionIndexBase;
import org.neo4j.values.storable.ValueGroup; import org.neo4j.values.storable.ValueGroup;


import static org.neo4j.kernel.impl.index.schema.fusion.FusionIndexBase.forAll;

public class TemporalIndexUpdater extends TemporalIndexCache<NativeSchemaIndexUpdater<?, NativeSchemaValue>> implements IndexUpdater public class TemporalIndexUpdater extends TemporalIndexCache<NativeSchemaIndexUpdater<?, NativeSchemaValue>> implements IndexUpdater
{ {
TemporalIndexUpdater( TemporalIndexAccessor accessor, IndexUpdateMode mode ) TemporalIndexUpdater( TemporalIndexAccessor accessor, IndexUpdateMode mode )
Expand Down Expand Up @@ -68,7 +69,7 @@ public void process( IndexEntryUpdate<?> update ) throws IOException, IndexEntry
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
FusionIndexBase.forAll( NativeSchemaIndexUpdater::close, this ); forAll( NativeSchemaIndexUpdater::close, this );
} }


static class PartFactory implements TemporalIndexCache.Factory<NativeSchemaIndexUpdater<?, NativeSchemaValue>> static class PartFactory implements TemporalIndexCache.Factory<NativeSchemaIndexUpdater<?, NativeSchemaValue>>
Expand Down
Expand Up @@ -21,9 +21,7 @@


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;


import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.BoundedIterable; import org.neo4j.helpers.collection.BoundedIterable;
Expand All @@ -36,26 +34,26 @@
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptor; import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptor;
import org.neo4j.kernel.impl.api.index.IndexUpdateMode; import org.neo4j.kernel.impl.api.index.IndexUpdateMode;
import org.neo4j.kernel.impl.index.schema.fusion.FusionIndexProvider.DropAction; import org.neo4j.kernel.impl.index.schema.fusion.FusionIndexProvider.DropAction;
import org.neo4j.kernel.impl.index.schema.fusion.FusionIndexProvider.Selector;
import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.values.storable.Value; import org.neo4j.values.storable.Value;


import static java.util.Arrays.stream;
import static org.neo4j.helpers.collection.Iterators.concatResourceIterators; import static org.neo4j.helpers.collection.Iterators.concatResourceIterators;
import static org.neo4j.helpers.collection.Iterators.iterator;
import static org.neo4j.kernel.impl.index.schema.fusion.SlotSelector.INSTANCE_COUNT;


class FusionIndexAccessor extends FusionIndexBase<IndexAccessor> implements IndexAccessor class FusionIndexAccessor extends FusionIndexBase<IndexAccessor> implements IndexAccessor
{ {
private final long indexId; private final long indexId;
private final SchemaIndexDescriptor descriptor; private final SchemaIndexDescriptor descriptor;
private final DropAction dropAction; private final DropAction dropAction;


FusionIndexAccessor( IndexAccessor[] accessors, FusionIndexAccessor( SlotSelector slotSelector,
Selector selector, Selector<IndexAccessor> selector,
long indexId, long indexId,
SchemaIndexDescriptor descriptor, SchemaIndexDescriptor descriptor,
DropAction dropAction ) DropAction dropAction )
{ {
super( accessors, selector ); super( slotSelector, selector );
this.indexId = indexId; this.indexId = indexId;
this.descriptor = descriptor; this.descriptor = descriptor;
this.dropAction = dropAction; this.dropAction = dropAction;
Expand All @@ -64,44 +62,46 @@ class FusionIndexAccessor extends FusionIndexBase<IndexAccessor> implements Inde
@Override @Override
public void drop() throws IOException public void drop() throws IOException
{ {
forAll( IndexAccessor::drop, instances ); forAll( IndexAccessor::drop, selector );
dropAction.drop( indexId ); dropAction.drop( indexId );
} }


@Override @Override
public IndexUpdater newUpdater( IndexUpdateMode mode ) public IndexUpdater newUpdater( IndexUpdateMode mode )
{ {
return new FusionIndexUpdater( instancesAs( IndexUpdater.class, accessor -> accessor.newUpdater( mode ) ), selector ); Selector<IndexUpdater> updaterSelector = new Selector<>( new IndexUpdater[INSTANCE_COUNT], slot -> selector.select( slot ).newUpdater( mode ) );
return new FusionIndexUpdater( slotSelector, updaterSelector );
} }


@Override @Override
public void force( IOLimiter ioLimiter ) throws IOException public void force( IOLimiter ioLimiter ) throws IOException
{ {
forAll( accessor -> accessor.force( ioLimiter ), instances ); forAll( accessor -> accessor.force( ioLimiter ), selector );
} }


@Override @Override
public void refresh() throws IOException public void refresh() throws IOException
{ {
forAll( IndexAccessor::refresh, instances ); forAll( IndexAccessor::refresh, selector );
} }


@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
forAll( IndexAccessor::close, instances ); forAll( IndexAccessor::close, selector );
} }


@Override @Override
public IndexReader newReader() public IndexReader newReader()
{ {
return new FusionIndexReader( instancesAs( IndexReader.class, IndexAccessor::newReader ), selector, descriptor ); Selector<IndexReader> readerSelector = new Selector<>( new IndexReader[INSTANCE_COUNT], slot -> selector.select( slot ).newReader() );
return new FusionIndexReader( slotSelector, readerSelector, descriptor );
} }


@Override @Override
public BoundedIterable<Long> newAllEntriesReader() public BoundedIterable<Long> newAllEntriesReader()
{ {
BoundedIterable<Long>[] entries = instancesAs( BoundedIterable.class, IndexAccessor::newAllEntriesReader ); BoundedIterable<Long>[] entries = instancesAs( new BoundedIterable[INSTANCE_COUNT], IndexAccessor::newAllEntriesReader );
return new BoundedIterable<Long>() return new BoundedIterable<Long>()
{ {
@Override @Override
Expand Down Expand Up @@ -147,30 +147,33 @@ public Iterator<Long> iterator()
@Override @Override
public ResourceIterator<File> snapshotFiles() throws IOException public ResourceIterator<File> snapshotFiles() throws IOException
{ {
List<ResourceIterator<File>> snapshots = new ArrayList<>(); return concatResourceIterators( iterator( instancesAs( new ResourceIterator[INSTANCE_COUNT], accessor -> accessor.snapshotFiles() ) ) );
forAll( accessor -> snapshots.add( accessor.snapshotFiles() ), instances );
return concatResourceIterators( snapshots.iterator() );
} }


@Override @Override
public void verifyDeferredConstraints( PropertyAccessor propertyAccessor ) public void verifyDeferredConstraints( PropertyAccessor propertyAccessor )
throws IndexEntryConflictException, IOException throws IndexEntryConflictException, IOException
{ {
for ( IndexAccessor accessor : instances ) for ( int slot = 0; slot < INSTANCE_COUNT; slot++ )
{ {
accessor.verifyDeferredConstraints( propertyAccessor ); selector.select( slot ).verifyDeferredConstraints( propertyAccessor );
} }
} }


@Override @Override
public boolean isDirty() public boolean isDirty()
{ {
return stream( instances ).anyMatch( IndexAccessor::isDirty ); boolean isDirty = false;
for ( int slot = 0; slot < INSTANCE_COUNT; slot++ )
{
isDirty |= selector.select( slot ).isDirty();
}
return isDirty;
} }


@Override @Override
public void validateBeforeCommit( Value[] tuple ) public void validateBeforeCommit( Value[] tuple )
{ {
selector.select( instances, tuple ).validateBeforeCommit( tuple ); selector.select( slotSelector.selectSlot( tuple, GROUP_OF ) ).validateBeforeCommit( tuple );
} }
} }
Expand Up @@ -19,14 +19,18 @@
*/ */
package org.neo4j.kernel.impl.index.schema.fusion; package org.neo4j.kernel.impl.index.schema.fusion;


import java.lang.reflect.Array;
import java.util.Arrays; import java.util.Arrays;
import java.util.function.Function;


import org.neo4j.collection.primitive.PrimitiveIntCollections; import org.neo4j.collection.primitive.PrimitiveIntCollections;
import org.neo4j.function.ThrowingConsumer; import org.neo4j.function.ThrowingConsumer;
import org.neo4j.function.ThrowingFunction; import org.neo4j.function.ThrowingFunction;
import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.api.index.IndexProvider; import org.neo4j.kernel.api.index.IndexProvider;
import org.neo4j.values.storable.Value;
import org.neo4j.values.storable.ValueGroup;

import static org.neo4j.kernel.impl.index.schema.fusion.SlotSelector.INSTANCE_COUNT;


/** /**
* Acting as a simplifier for the multiplexing that is going in inside a fusion index. A fusion index consists of multiple parts, * Acting as a simplifier for the multiplexing that is going in inside a fusion index. A fusion index consists of multiple parts,
Expand All @@ -37,42 +41,80 @@
*/ */
public abstract class FusionIndexBase<T> public abstract class FusionIndexBase<T>
{ {
static final int INSTANCE_COUNT = 5; static Function<Value,ValueGroup> GROUP_OF = Value::valueGroup;

static final int STRING = 0;
static final int NUMBER = 1;
static final int SPATIAL = 2;
static final int TEMPORAL = 3;
static final int LUCENE = 4;


final T[] instances; final SlotSelector slotSelector;
final FusionIndexProvider.Selector selector; final Selector<T> selector;


FusionIndexBase( T[] instances, FusionIndexProvider.Selector selector ) FusionIndexBase( SlotSelector slotSelector, Selector<T> selector )
{ {
assert instances.length == INSTANCE_COUNT; this.slotSelector = slotSelector;
this.instances = instances;
this.selector = selector; this.selector = selector;
} }


<R,E extends Exception> R[] instancesAs( Class<R> cls, ThrowingFunction<T,R,E> converter ) throws E /**
* Short-hand for calling the static {@link #instancesAs(Selector, Object[], ThrowingFunction)}, here with the local {@link #selector}.
*/
<R,E extends Exception> R[] instancesAs( R[] target, ThrowingFunction<T,R,E> converter ) throws E
{ {
return instancesAs( instances, cls, converter ); return instancesAs( selector, target, converter );
} }


static <T,R,E extends Exception> R[] instancesAs( T[] instances, Class<R> cls, ThrowingFunction<T,R,E> converter ) throws E /**
* Convenience method typically for calling a method on each of the sub-parts of a fusion entity,
* one which creates another instance. All those instances are returned as an array, or actually put into an array
* created by the caller to avoid reflection to instantiate the array.
*
* @param selector {@link Selector} to use as the source.
* @param target array to put the created instances into, also returned.
* @param converter {@link ThrowingFunction} which converts from the source to target instance.
* @param <S> type of source instance.
* @param <T> type of target instance.
* @param <E> type of exception that converter may throw.
* @return the target array which was passed in, now populated.
* @throws E exception from converter.
*/
static <S,T,E extends Exception> T[] instancesAs( Selector<S> selector, T[] target, ThrowingFunction<S,T,E> converter ) throws E
{ {
R[] result = (R[]) Array.newInstance( cls, instances.length ); for ( int slot = 0; slot < INSTANCE_COUNT; slot++ )
for ( int i = 0; i < instances.length; i++ )
{ {
result[i] = converter.apply( instances[i] ); target[slot] = converter.apply( selector.select( slot ) );
}
return target;
}

static <T, E extends Exception> void forInstantiated( ThrowingConsumer<T,E> consumer, Selector<T> selector ) throws E
{
E exception = null;
for ( int slot = 0; slot < INSTANCE_COUNT; slot++ )
{
T instance = selector.getIfInstantiated( slot );
if ( instance != null )
{
exception = consume( exception, consumer, instance );
}
}
if ( exception != null )
{
throw exception;
}
}

public static <T, E extends Exception> void forAll( ThrowingConsumer<T,E> consumer, Selector<T> selector ) throws E
{
E exception = null;
for ( int slot = 0; slot < INSTANCE_COUNT; slot++ )
{
exception = consume( exception, consumer, selector.select( slot ) );
}
if ( exception != null )
{
throw exception;
} }
return result;
} }


/** /**
* NOTE: duplicate of {@link #forAll(ThrowingConsumer, Iterable)} to avoid having to wrap subjects of one form into another. * See {@link #forAll(ThrowingConsumer, Object[])}
* There are some real use cases for passing in an array instead of {@link Iterable} out there...
* *
* Method for calling a lambda function on many objects when it is expected that the function might * Method for calling a lambda function on many objects when it is expected that the function might
* throw an exception. First exception will be thrown and subsequent will be suppressed. * throw an exception. First exception will be thrown and subsequent will be suppressed.
Expand All @@ -90,21 +132,12 @@ static <T,R,E extends Exception> R[] instancesAs( T[] instances, Class<R> cls, T
* @param <E> the type of exception anticipated, inferred from the lambda * @param <E> the type of exception anticipated, inferred from the lambda
* @throws E if consumption fails with this exception * @throws E if consumption fails with this exception
*/ */
@SafeVarargs public static <T, E extends Exception> void forAll( ThrowingConsumer<T,E> consumer, Iterable<T> subjects ) throws E
public static <T, E extends Exception> void forAll( ThrowingConsumer<T,E> consumer, T... subjects ) throws E
{ {
// Duplicate this method for array to avoid creating a purely internal list to shove that in to the other method.
E exception = null; E exception = null;
for ( T subject : subjects ) for ( T instance : subjects )
{ {
try exception = consume( exception, consumer, instance );
{
consumer.accept( subject );
}
catch ( Exception e )
{
exception = Exceptions.chain( exception, (E) e );
}
} }
if ( exception != null ) if ( exception != null )
{ {
Expand All @@ -113,10 +146,6 @@ public static <T, E extends Exception> void forAll( ThrowingConsumer<T,E> consum
} }


/** /**
* See {@link #forAll(ThrowingConsumer, Object[])}
* NOTE: duplicate of {@link #forAll(ThrowingConsumer, Object[])} to avoid having to wrap subjects of one form into another.
* There are some real use cases for passing in an Iterable instead of array out there...
*
* Method for calling a lambda function on many objects when it is expected that the function might * Method for calling a lambda function on many objects when it is expected that the function might
* throw an exception. First exception will be thrown and subsequent will be suppressed. * throw an exception. First exception will be thrown and subsequent will be suppressed.
* *
Expand All @@ -133,24 +162,22 @@ public static <T, E extends Exception> void forAll( ThrowingConsumer<T,E> consum
* @param <E> the type of exception anticipated, inferred from the lambda * @param <E> the type of exception anticipated, inferred from the lambda
* @throws E if consumption fails with this exception * @throws E if consumption fails with this exception
*/ */
public static <T, E extends Exception> void forAll( ThrowingConsumer<T,E> consumer, Iterable<T> subjects ) throws E public static <T, E extends Exception> void forAll( ThrowingConsumer<T,E> consumer, T[] subjects ) throws E
{ {
E exception = null; forAll( consumer, Arrays.asList( subjects ) );
for ( T subject : subjects ) }

private static <E extends Exception, T> E consume( E exception, ThrowingConsumer<T,E> consumer, T instance )
{
try
{ {
try consumer.accept( instance );
{
consumer.accept( subject );
}
catch ( Exception e )
{
exception = Exceptions.chain( exception, (E) e );
}
} }
if ( exception != null ) catch ( Exception e )
{ {
throw exception; exception = Exceptions.chain( exception, (E) e );
} }
return exception;
} }


static void validateSelectorInstances( Object[] instances, int... aliveIndex ) static void validateSelectorInstances( Object[] instances, int... aliveIndex )
Expand Down

0 comments on commit 88020d3

Please sign in to comment.